You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/11/29 16:54:40 UTC

[airflow] branch main updated: Add cli command for 'airflow dags reserialize` (#19471)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c4e8959  Add cli command for 'airflow dags reserialize` (#19471)
c4e8959 is described below

commit c4e8959d141512226a994baeea74d5c7e643c598
Author: Collin McNulty <co...@gmail.com>
AuthorDate: Mon Nov 29 10:54:14 2021 -0600

    Add cli command for 'airflow dags reserialize` (#19471)
---
 airflow/cli/cli_parser.py              | 18 ++++++++++++++++++
 airflow/cli/commands/dag_command.py    | 12 ++++++++++++
 docs/spelling_wordlist.txt             |  3 +++
 tests/cli/commands/test_dag_command.py | 22 ++++++++++++++++++++++
 4 files changed, 55 insertions(+)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 5fbd043..62943ee 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -769,6 +769,13 @@ ARG_CAPACITY = Arg(
     help="The maximum number of triggers that a Triggerer will run at one time.",
 )
 
+# reserialize
+ARG_CLEAR_ONLY = Arg(
+    ("--clear-only",),
+    action="store_true",
+    help="If passed, serialized DAGs will be cleared but not reserialized.",
+)
+
 ALTERNATIVE_CONN_SPECS_ARGS = [
     ARG_CONN_TYPE,
     ARG_CONN_DESCRIPTION,
@@ -977,6 +984,17 @@ DAGS_COMMANDS = (
             ARG_SAVE_DAGRUN,
         ),
     ),
+    ActionCommand(
+        name='reserialize',
+        help="Reserialize all DAGs by parsing the DagBag files",
+        description=(
+            "Drop all serialized dags from the metadata DB. This will cause all DAGs to be reserialized "
+            "from the DagBag folder. This can be helpful if your serialized DAGs get out of sync with the "
+            "version of Airflow that you are running."
+        ),
+        func=lazy_load_command('airflow.cli.commands.dag_command.dag_reserialize'),
+        args=(ARG_CLEAR_ONLY,),
+    ),
 )
 TASKS_COMMANDS = (
     ActionCommand(
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index b94d6cf..4e81d9d 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -37,6 +37,7 @@ from airflow.executors.debug_executor import DebugExecutor
 from airflow.jobs.base_job import BaseJob
 from airflow.models import DagBag, DagModel, DagRun, TaskInstance
 from airflow.models.dag import DAG
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import (
     get_dag,
@@ -441,3 +442,14 @@ def dag_test(args, session=None):
             _display_dot_via_imgcat(dot_graph)
         if show_dagrun:
             print(dot_graph.source)
+
+
+@provide_session
+@cli_utils.action_logging
+def dag_reserialize(args, session=None):
+    session.query(SerializedDagModel).delete(synchronize_session=False)
+
+    if not args.clear_only:
+        dagbag = DagBag()
+        dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+        dagbag.sync_to_db()
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index c7b95bc..4703bd2 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -302,6 +302,7 @@ Redhat
 ReidentifyContentResponse
 Reinitialising
 Remoting
+Reserialize
 ResourceRequirements
 Roadmap
 Robinhood
@@ -1167,6 +1168,8 @@ replicaSet
 repo
 repos
 reqs
+reserialize
+reserialized
 resetdb
 resourceVersion
 resultset
diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py
index 0044761..88766bc 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -30,6 +30,7 @@ from airflow.cli import cli_parser
 from airflow.cli.commands import dag_command
 from airflow.exceptions import AirflowException
 from airflow.models import DagBag, DagModel, DagRun
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
@@ -62,6 +63,27 @@ class TestCliDags(unittest.TestCase):
         clear_db_runs()
         clear_db_dags()
 
+    def test_reserialize(self):
+        # Assert that there are serialized Dags
+        with create_session() as session:
+            serialized_dags_before_command = session.query(SerializedDagModel).all()
+        assert len(serialized_dags_before_command)  # There are serialized DAGs to delete
+
+        # Run clear of serialized dags
+        dag_command.dag_reserialize(self.parser.parse_args(['dags', 'reserialize', "--clear-only"]))
+        # Assert no serialized Dags
+        with create_session() as session:
+            serialized_dags_after_clear = session.query(SerializedDagModel).all()
+        assert not len(serialized_dags_after_clear)
+
+        # Serialize manually
+        dag_command.dag_reserialize(self.parser.parse_args(['dags', 'reserialize']))
+
+        # Check serialized DAGs are back
+        with create_session() as session:
+            serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
+        assert len(serialized_dags_after_reserialize) >= 40  # Serialized DAGs back
+
     @mock.patch("airflow.cli.commands.dag_command.DAG.run")
     def test_backfill(self, mock_run):
         dag_command.dag_backfill(