You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/29 17:22:21 UTC

[airflow] branch v1-10-test updated (eae07aa -> 0451378)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from eae07aa  Add support for fetching logs from running pods (#8626)
     new a23f48d  Fix the default value for store_dag_code (#9554)
     new 0451378  Fix failing test in DagCode (#9565)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/config_templates/config.yml          |  4 ++--
 airflow/config_templates/default_airflow.cfg |  3 ++-
 airflow/models/dag.py                        |  2 +-
 airflow/models/dagcode.py                    |  4 ++--
 airflow/settings.py                          |  5 +++++
 airflow/utils/dag_processing.py              |  4 ++--
 tests/models/test_dagcode.py                 |  6 ++++++
 tests/test_configuration.py                  | 20 ++++++++++++++++++++
 8 files changed, 40 insertions(+), 8 deletions(-)


[airflow] 01/02: Fix the default value for store_dag_code (#9554)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a23f48d4b99c3e1d1d5e1e6fe70ed0f229b5ccc9
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jun 29 12:46:06 2020 +0100

    Fix the default value for store_dag_code (#9554)
    
    related to #8255 (fixes the issue mentioned with `store_dag_code` but does not address Config interpolation)
    
    The default value of `store_dag_code` should be same as `store_serialized_dags` setting.  But if the value is set it should use that value
    
    (cherry picked from commit 57c722b65c1ddfe527924448291f29ff7036ad0a)
---
 airflow/config_templates/config.yml          |  4 ++--
 airflow/config_templates/default_airflow.cfg |  3 ++-
 airflow/models/dag.py                        |  2 +-
 airflow/models/dagcode.py                    |  4 ++--
 airflow/settings.py                          |  5 +++++
 airflow/utils/dag_processing.py              |  4 ++--
 tests/test_configuration.py                  | 20 ++++++++++++++++++++
 7 files changed, 34 insertions(+), 8 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 61491d8..e32b8cc 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -445,8 +445,8 @@
         ``store_serialized_dags`` setting.
       version_added: 1.10.10
       type: string
-      example: ~
-      default: "%(store_serialized_dags)s"
+      example: "False"
+      default: ~
     - name: max_num_rendered_ti_fields_per_task
       description: |
         Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 2cc97e2..c75d3ae 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -232,7 +232,8 @@ min_serialized_dag_update_interval = 30
 # If set to True, Webserver reads file contents from DB instead of
 # trying to access files in a DAG folder. Defaults to same as the
 # ``store_serialized_dags`` setting.
-store_dag_code = %(store_serialized_dags)s
+# Example: store_dag_code = False
+# store_dag_code =
 
 # Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
 # in the Database.
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 7759cb3..933dc10 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1529,7 +1529,7 @@ class DAG(BaseDag, LoggingMixin):
         orm_dag.schedule_interval = self.schedule_interval
         orm_dag.tags = self.get_dagtags(session=session)
 
-        if conf.getboolean('core', 'store_dag_code', fallback=False):
+        if settings.STORE_DAG_CODE:
             DagCode.bulk_sync_to_db([orm_dag.fileloc])
 
         session.commit()
diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py
index 513ec18..6aa7b6a 100644
--- a/airflow/models/dagcode.py
+++ b/airflow/models/dagcode.py
@@ -21,9 +21,9 @@ from datetime import datetime
 
 from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
 
-from airflow.configuration import conf
 from airflow.exceptions import AirflowException, DagCodeNotFound
 from airflow.models import Base
+from airflow.settings import STORE_DAG_CODE
 from airflow.utils import timezone
 from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
 from airflow.utils.db import provide_session
@@ -178,7 +178,7 @@ class DagCode(Base):
 
         :return: source code as string
         """
-        if conf.getboolean('core', 'store_dag_code', fallback=False):
+        if STORE_DAG_CODE:
             return cls._get_code_from_db(fileloc)
         else:
             return cls._get_code_from_file(fileloc)
diff --git a/airflow/settings.py b/airflow/settings.py
index 513f192..c86d4b2 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -401,6 +401,11 @@ STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallbac
 MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
     'core', 'min_serialized_dag_update_interval', fallback=30)
 
+# Whether to persist DAG files code in DB. If set to True, Webserver reads file contents
+# from DB instead of trying to access files in a DAG folder.
+# Defaults to same as the store_serialized_dags setting.
+STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIALIZED_DAGS)
+
 # If donot_modify_handlers=True, we do not modify logging handlers in task_run command
 # If the flag is set to False, we remove all handlers from the root logger
 # and add all handlers from 'airflow.task' logger to the root Logger. This is done
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 6e4e045..3aac8fd 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -50,7 +50,7 @@ from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
 from airflow.settings import Stats
 from airflow.models import errors
-from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.settings import STORE_DAG_CODE, STORE_SERIALIZED_DAGS
 from airflow.utils import timezone
 from airflow.utils.helpers import reap_process_group
 from airflow.utils.db import provide_session
@@ -914,7 +914,7 @@ class DagFileProcessorManager(LoggingMixin):
                 SerializedDagModel.remove_deleted_dags(self._file_paths)
                 DagModel.deactivate_deleted_dags(self._file_paths)
 
-            if conf.getboolean('core', 'store_dag_code', fallback=False):
+            if STORE_DAG_CODE:
                 from airflow.models.dagcode import DagCode
                 DagCode.remove_deleted_code(self._file_paths)
 
diff --git a/tests/test_configuration.py b/tests/test_configuration.py
index af1df69..5c40cad 100644
--- a/tests/test_configuration.py
+++ b/tests/test_configuration.py
@@ -32,6 +32,7 @@ import six
 from airflow import configuration
 from airflow.configuration import conf, AirflowConfigParser, parameterized_config
 from tests.compat import mock
+from tests.test_utils.config import conf_vars
 from tests.test_utils.reset_warning_registry import reset_warning_registry
 
 if six.PY2:
@@ -494,3 +495,22 @@ notacommand = OK
             conf.write(string_file)
             content = string_file.getvalue()
         self.assertIn("dags_folder = /tmp/test_folder", content)
+
+    @conf_vars({("core", "store_serialized_dags"): "True"})
+    def test_store_dag_code_default_config(self):
+        store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
+        store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
+        self.assertFalse(conf.has_option("core", "store_dag_code"))
+        self.assertTrue(store_serialized_dags)
+        self.assertTrue(store_dag_code)
+
+    @conf_vars({
+        ("core", "store_serialized_dags"): "True",
+        ("core", "store_dag_code"): "False"
+    })
+    def test_store_dag_code_config_when_set(self):
+        store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
+        store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
+        self.assertTrue(conf.has_option("core", "store_dag_code"))
+        self.assertTrue(store_serialized_dags)
+        self.assertFalse(store_dag_code)


[airflow] 02/02: Fix failing test in DagCode (#9565)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0451378f03b51c4e0890e07f1c3f4be3f6f3a43b
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jun 29 15:16:41 2020 +0100

    Fix failing test in DagCode (#9565)
    
    PR https://github.com/apache/airflow/pull/9554 introduced this error and because of Github issue currently (github is down / has degraded performance) the CI didn't run fully
    
    (cherry picked from commit ee0335315e421c8ce9e826f7ac5e8f4c82f171fe)
---
 tests/models/test_dagcode.py | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/tests/models/test_dagcode.py b/tests/models/test_dagcode.py
index 3eb5f34..2926de6 100644
--- a/tests/models/test_dagcode.py
+++ b/tests/models/test_dagcode.py
@@ -56,6 +56,7 @@ class TestDagCode(unittest.TestCase):
         return [bash_dag, xcom_dag]
 
     @conf_vars({('core', 'store_dag_code'): 'True'})
+    @patch("airflow.models.dag.settings.STORE_DAG_CODE", True)
     def _write_example_dags(self):
         example_dags = make_example_dags(example_dags_module)
         for dag in example_dags.values():
@@ -68,6 +69,7 @@ class TestDagCode(unittest.TestCase):
 
         self._compare_example_dags(example_dags)
 
+    @conf_vars({('core', 'store_dag_code'): 'True'})
     def test_bulk_sync_to_db(self):
         """Dg code can be bulk written into database."""
         example_dags = make_example_dags(example_dags_module)
@@ -78,6 +80,7 @@ class TestDagCode(unittest.TestCase):
 
         self._compare_example_dags(example_dags)
 
+    @conf_vars({('core', 'store_dag_code'): 'True'})
     def test_bulk_sync_to_db_half_files(self):
         """Dg code can be bulk written into database."""
         example_dags = make_example_dags(example_dags_module)
@@ -119,6 +122,8 @@ class TestDagCode(unittest.TestCase):
                 self.assertEqual(result.source_code, source_code)
 
     @conf_vars({('core', 'store_dag_code'): 'True'})
+    @patch("airflow.models.dag.settings.STORE_DAG_CODE", True)
+    @patch("airflow.models.dagcode.STORE_DAG_CODE", True)
     def test_code_can_be_read_when_no_access_to_file(self):
         """
         Test that code can be retrieved from DB when you do not have access to Code file.
@@ -140,6 +145,7 @@ class TestDagCode(unittest.TestCase):
                 self.assertIn(test_string, dag_code)
 
     @conf_vars({('core', 'store_dag_code'): 'True'})
+    @patch("airflow.models.dag.settings.STORE_DAG_CODE", True)
     def test_db_code_updated_on_dag_file_change(self):
         """Test if DagCode is updated in DB when DAG file is changed"""
         example_dag = make_example_dags(example_dags_module).get('example_bash_operator')