You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/11/29 16:54:40 UTC
[airflow] branch main updated: Add cli command for 'airflow dags reserialize` (#19471)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c4e8959 Add cli command for 'airflow dags reserialize` (#19471)
c4e8959 is described below
commit c4e8959d141512226a994baeea74d5c7e643c598
Author: Collin McNulty <co...@gmail.com>
AuthorDate: Mon Nov 29 10:54:14 2021 -0600
Add cli command for 'airflow dags reserialize` (#19471)
---
airflow/cli/cli_parser.py | 18 ++++++++++++++++++
airflow/cli/commands/dag_command.py | 12 ++++++++++++
docs/spelling_wordlist.txt | 3 +++
tests/cli/commands/test_dag_command.py | 22 ++++++++++++++++++++++
4 files changed, 55 insertions(+)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 5fbd043..62943ee 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -769,6 +769,13 @@ ARG_CAPACITY = Arg(
help="The maximum number of triggers that a Triggerer will run at one time.",
)
+# reserialize
+ARG_CLEAR_ONLY = Arg(
+ ("--clear-only",),
+ action="store_true",
+ help="If passed, serialized DAGs will be cleared but not reserialized.",
+)
+
ALTERNATIVE_CONN_SPECS_ARGS = [
ARG_CONN_TYPE,
ARG_CONN_DESCRIPTION,
@@ -977,6 +984,17 @@ DAGS_COMMANDS = (
ARG_SAVE_DAGRUN,
),
),
+ ActionCommand(
+ name='reserialize',
+ help="Reserialize all DAGs by parsing the DagBag files",
+ description=(
+ "Drop all serialized dags from the metadata DB. This will cause all DAGs to be reserialized "
+ "from the DagBag folder. This can be helpful if your serialized DAGs get out of sync with the "
+ "version of Airflow that you are running."
+ ),
+ func=lazy_load_command('airflow.cli.commands.dag_command.dag_reserialize'),
+ args=(ARG_CLEAR_ONLY,),
+ ),
)
TASKS_COMMANDS = (
ActionCommand(
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index b94d6cf..4e81d9d 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -37,6 +37,7 @@ from airflow.executors.debug_executor import DebugExecutor
from airflow.jobs.base_job import BaseJob
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.dag import DAG
+from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils
from airflow.utils.cli import (
get_dag,
@@ -441,3 +442,14 @@ def dag_test(args, session=None):
_display_dot_via_imgcat(dot_graph)
if show_dagrun:
print(dot_graph.source)
+
+
+@provide_session
+@cli_utils.action_logging
+def dag_reserialize(args, session=None):
+ session.query(SerializedDagModel).delete(synchronize_session=False)
+
+ if not args.clear_only:
+ dagbag = DagBag()
+ dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+ dagbag.sync_to_db()
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index c7b95bc..4703bd2 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -302,6 +302,7 @@ Redhat
ReidentifyContentResponse
Reinitialising
Remoting
+Reserialize
ResourceRequirements
Roadmap
Robinhood
@@ -1167,6 +1168,8 @@ replicaSet
repo
repos
reqs
+reserialize
+reserialized
resetdb
resourceVersion
resultset
diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py
index 0044761..88766bc 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -30,6 +30,7 @@ from airflow.cli import cli_parser
from airflow.cli.commands import dag_command
from airflow.exceptions import AirflowException
from airflow.models import DagBag, DagModel, DagRun
+from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
@@ -62,6 +63,27 @@ class TestCliDags(unittest.TestCase):
clear_db_runs()
clear_db_dags()
+ def test_reserialize(self):
+ # Assert that there are serialized Dags
+ with create_session() as session:
+ serialized_dags_before_command = session.query(SerializedDagModel).all()
+ assert len(serialized_dags_before_command) # There are serialized DAGs to delete
+
+ # Run clear of serialized dags
+ dag_command.dag_reserialize(self.parser.parse_args(['dags', 'reserialize', "--clear-only"]))
+ # Assert no serialized Dags
+ with create_session() as session:
+ serialized_dags_after_clear = session.query(SerializedDagModel).all()
+ assert not len(serialized_dags_after_clear)
+
+ # Serialize manually
+ dag_command.dag_reserialize(self.parser.parse_args(['dags', 'reserialize']))
+
+ # Check serialized DAGs are back
+ with create_session() as session:
+ serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
+ assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs back
+
@mock.patch("airflow.cli.commands.dag_command.DAG.run")
def test_backfill(self, mock_run):
dag_command.dag_backfill(