You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/29 17:22:22 UTC
[airflow] 01/02: Fix the default value for store_dag_code (#9554)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a23f48d4b99c3e1d1d5e1e6fe70ed0f229b5ccc9
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jun 29 12:46:06 2020 +0100
Fix the default value for store_dag_code (#9554)
related to #8255 (fixes the issue mentioned with `store_dag_code` but does not address Config interpolation)
The default value of `store_dag_code` should be same as `store_serialized_dags` setting. But if the value is set it should use that value
(cherry picked from commit 57c722b65c1ddfe527924448291f29ff7036ad0a)
---
airflow/config_templates/config.yml | 4 ++--
airflow/config_templates/default_airflow.cfg | 3 ++-
airflow/models/dag.py | 2 +-
airflow/models/dagcode.py | 4 ++--
airflow/settings.py | 5 +++++
airflow/utils/dag_processing.py | 4 ++--
tests/test_configuration.py | 20 ++++++++++++++++++++
7 files changed, 34 insertions(+), 8 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 61491d8..e32b8cc 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -445,8 +445,8 @@
``store_serialized_dags`` setting.
version_added: 1.10.10
type: string
- example: ~
- default: "%(store_serialized_dags)s"
+ example: "False"
+ default: ~
- name: max_num_rendered_ti_fields_per_task
description: |
Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 2cc97e2..c75d3ae 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -232,7 +232,8 @@ min_serialized_dag_update_interval = 30
# If set to True, Webserver reads file contents from DB instead of
# trying to access files in a DAG folder. Defaults to same as the
# ``store_serialized_dags`` setting.
-store_dag_code = %(store_serialized_dags)s
+# Example: store_dag_code = False
+# store_dag_code =
# Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
# in the Database.
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7759cb3..933dc10 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1529,7 +1529,7 @@ class DAG(BaseDag, LoggingMixin):
orm_dag.schedule_interval = self.schedule_interval
orm_dag.tags = self.get_dagtags(session=session)
- if conf.getboolean('core', 'store_dag_code', fallback=False):
+ if settings.STORE_DAG_CODE:
DagCode.bulk_sync_to_db([orm_dag.fileloc])
session.commit()
diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py
index 513ec18..6aa7b6a 100644
--- a/airflow/models/dagcode.py
+++ b/airflow/models/dagcode.py
@@ -21,9 +21,9 @@ from datetime import datetime
from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
-from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagCodeNotFound
from airflow.models import Base
+from airflow.settings import STORE_DAG_CODE
from airflow.utils import timezone
from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
from airflow.utils.db import provide_session
@@ -178,7 +178,7 @@ class DagCode(Base):
:return: source code as string
"""
- if conf.getboolean('core', 'store_dag_code', fallback=False):
+ if STORE_DAG_CODE:
return cls._get_code_from_db(fileloc)
else:
return cls._get_code_from_file(fileloc)
diff --git a/airflow/settings.py b/airflow/settings.py
index 513f192..c86d4b2 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -401,6 +401,11 @@ STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallbac
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
'core', 'min_serialized_dag_update_interval', fallback=30)
+# Whether to persist DAG files code in DB. If set to True, Webserver reads file contents
+# from DB instead of trying to access files in a DAG folder.
+# Defaults to same as the store_serialized_dags setting.
+STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIALIZED_DAGS)
+
# If donot_modify_handlers=True, we do not modify logging handlers in task_run command
# If the flag is set to False, we remove all handlers from the root logger
# and add all handlers from 'airflow.task' logger to the root Logger. This is done
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 6e4e045..3aac8fd 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -50,7 +50,7 @@ from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
from airflow.settings import Stats
from airflow.models import errors
-from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.settings import STORE_DAG_CODE, STORE_SERIALIZED_DAGS
from airflow.utils import timezone
from airflow.utils.helpers import reap_process_group
from airflow.utils.db import provide_session
@@ -914,7 +914,7 @@ class DagFileProcessorManager(LoggingMixin):
SerializedDagModel.remove_deleted_dags(self._file_paths)
DagModel.deactivate_deleted_dags(self._file_paths)
- if conf.getboolean('core', 'store_dag_code', fallback=False):
+ if STORE_DAG_CODE:
from airflow.models.dagcode import DagCode
DagCode.remove_deleted_code(self._file_paths)
diff --git a/tests/test_configuration.py b/tests/test_configuration.py
index af1df69..5c40cad 100644
--- a/tests/test_configuration.py
+++ b/tests/test_configuration.py
@@ -32,6 +32,7 @@ import six
from airflow import configuration
from airflow.configuration import conf, AirflowConfigParser, parameterized_config
from tests.compat import mock
+from tests.test_utils.config import conf_vars
from tests.test_utils.reset_warning_registry import reset_warning_registry
if six.PY2:
@@ -494,3 +495,22 @@ notacommand = OK
conf.write(string_file)
content = string_file.getvalue()
self.assertIn("dags_folder = /tmp/test_folder", content)
+
+ @conf_vars({("core", "store_serialized_dags"): "True"})
+ def test_store_dag_code_default_config(self):
+ store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
+ store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
+ self.assertFalse(conf.has_option("core", "store_dag_code"))
+ self.assertTrue(store_serialized_dags)
+ self.assertTrue(store_dag_code)
+
+ @conf_vars({
+ ("core", "store_serialized_dags"): "True",
+ ("core", "store_dag_code"): "False"
+ })
+ def test_store_dag_code_config_when_set(self):
+ store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
+ store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
+ self.assertTrue(conf.has_option("core", "store_dag_code"))
+ self.assertTrue(store_serialized_dags)
+ self.assertFalse(store_dag_code)