You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/29 17:22:22 UTC

[airflow] 01/02: Fix the default value for store_dag_code (#9554)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a23f48d4b99c3e1d1d5e1e6fe70ed0f229b5ccc9
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jun 29 12:46:06 2020 +0100

    Fix the default value for store_dag_code (#9554)
    
    related to #8255 (fixes the issue mentioned with `store_dag_code` but does not address Config interpolation)
    
    The default value of `store_dag_code` should be same as `store_serialized_dags` setting.  But if the value is set it should use that value
    
    (cherry picked from commit 57c722b65c1ddfe527924448291f29ff7036ad0a)
---
 airflow/config_templates/config.yml          |  4 ++--
 airflow/config_templates/default_airflow.cfg |  3 ++-
 airflow/models/dag.py                        |  2 +-
 airflow/models/dagcode.py                    |  4 ++--
 airflow/settings.py                          |  5 +++++
 airflow/utils/dag_processing.py              |  4 ++--
 tests/test_configuration.py                  | 20 ++++++++++++++++++++
 7 files changed, 34 insertions(+), 8 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 61491d8..e32b8cc 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -445,8 +445,8 @@
         ``store_serialized_dags`` setting.
       version_added: 1.10.10
       type: string
-      example: ~
-      default: "%(store_serialized_dags)s"
+      example: "False"
+      default: ~
     - name: max_num_rendered_ti_fields_per_task
       description: |
         Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 2cc97e2..c75d3ae 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -232,7 +232,8 @@ min_serialized_dag_update_interval = 30
 # If set to True, Webserver reads file contents from DB instead of
 # trying to access files in a DAG folder. Defaults to same as the
 # ``store_serialized_dags`` setting.
-store_dag_code = %(store_serialized_dags)s
+# Example: store_dag_code = False
+# store_dag_code =
 
 # Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
 # in the Database.
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7759cb3..933dc10 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1529,7 +1529,7 @@ class DAG(BaseDag, LoggingMixin):
         orm_dag.schedule_interval = self.schedule_interval
         orm_dag.tags = self.get_dagtags(session=session)
 
-        if conf.getboolean('core', 'store_dag_code', fallback=False):
+        if settings.STORE_DAG_CODE:
             DagCode.bulk_sync_to_db([orm_dag.fileloc])
 
         session.commit()
diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py
index 513ec18..6aa7b6a 100644
--- a/airflow/models/dagcode.py
+++ b/airflow/models/dagcode.py
@@ -21,9 +21,9 @@ from datetime import datetime
 
 from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
 
-from airflow.configuration import conf
 from airflow.exceptions import AirflowException, DagCodeNotFound
 from airflow.models import Base
+from airflow.settings import STORE_DAG_CODE
 from airflow.utils import timezone
 from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
 from airflow.utils.db import provide_session
@@ -178,7 +178,7 @@ class DagCode(Base):
 
         :return: source code as string
         """
-        if conf.getboolean('core', 'store_dag_code', fallback=False):
+        if STORE_DAG_CODE:
             return cls._get_code_from_db(fileloc)
         else:
             return cls._get_code_from_file(fileloc)
diff --git a/airflow/settings.py b/airflow/settings.py
index 513f192..c86d4b2 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -401,6 +401,11 @@ STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallbac
 MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
     'core', 'min_serialized_dag_update_interval', fallback=30)
 
+# Whether to persist DAG files code in DB. If set to True, Webserver reads file contents
+# from DB instead of trying to access files in a DAG folder.
+# Defaults to same as the store_serialized_dags setting.
+STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIALIZED_DAGS)
+
 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command
 # If the flag is set to False, we remove all handlers from the root logger
 # and add all handlers from 'airflow.task' logger to the root Logger. This is done
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 6e4e045..3aac8fd 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -50,7 +50,7 @@ from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
 from airflow.settings import Stats
 from airflow.models import errors
-from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.settings import STORE_DAG_CODE, STORE_SERIALIZED_DAGS
 from airflow.utils import timezone
 from airflow.utils.helpers import reap_process_group
 from airflow.utils.db import provide_session
@@ -914,7 +914,7 @@ class DagFileProcessorManager(LoggingMixin):
                 SerializedDagModel.remove_deleted_dags(self._file_paths)
                 DagModel.deactivate_deleted_dags(self._file_paths)
 
-            if conf.getboolean('core', 'store_dag_code', fallback=False):
+            if STORE_DAG_CODE:
                 from airflow.models.dagcode import DagCode
                 DagCode.remove_deleted_code(self._file_paths)
 
diff --git a/tests/test_configuration.py b/tests/test_configuration.py
index af1df69..5c40cad 100644
--- a/tests/test_configuration.py
+++ b/tests/test_configuration.py
@@ -32,6 +32,7 @@ import six
 from airflow import configuration
 from airflow.configuration import conf, AirflowConfigParser, parameterized_config
 from tests.compat import mock
+from tests.test_utils.config import conf_vars
 from tests.test_utils.reset_warning_registry import reset_warning_registry
 
 if six.PY2:
@@ -494,3 +495,22 @@ notacommand = OK
             conf.write(string_file)
             content = string_file.getvalue()
         self.assertIn("dags_folder = /tmp/test_folder", content)
+
+    @conf_vars({("core", "store_serialized_dags"): "True"})
+    def test_store_dag_code_default_config(self):
+        store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
+        store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
+        self.assertFalse(conf.has_option("core", "store_dag_code"))
+        self.assertTrue(store_serialized_dags)
+        self.assertTrue(store_dag_code)
+
+    @conf_vars({
+        ("core", "store_serialized_dags"): "True",
+        ("core", "store_dag_code"): "False"
+    })
+    def test_store_dag_code_config_when_set(self):
+        store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
+        store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
+        self.assertTrue(conf.has_option("core", "store_dag_code"))
+        self.assertTrue(store_serialized_dags)
+        self.assertFalse(store_dag_code)