You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/02 13:46:45 UTC

incubator-airflow git commit: [AIRFLOW-794] Access DAGS_FOLDER and SQL_ALCHEMY_CONN exclusively from settings

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3e6b923f8 -> 20c7ccc9c


[AIRFLOW-794] Access DAGS_FOLDER and SQL_ALCHEMY_CONN exclusively from settings

Closes #2013 from gsakkis/settings


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/20c7ccc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/20c7ccc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/20c7ccc9

Branch: refs/heads/master
Commit: 20c7ccc9cc1105643c85ddf5953615ca481c79e2
Parents: 3e6b923
Author: George Sakkis <ge...@gmail.com>
Authored: Thu Feb 2 14:45:48 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Feb 2 14:46:15 2017 +0100

----------------------------------------------------------------------
 airflow/__init__.py                  | 8 +++-----
 airflow/bin/cli.py                   | 9 ++-------
 airflow/configuration.py             | 6 ------
 airflow/jobs.py                      | 2 +-
 airflow/migrations/env.py            | 6 ++----
 airflow/models.py                    | 8 +++-----
 airflow/operators/dagrun_operator.py | 3 +--
 airflow/utils/db.py                  | 4 +---
 airflow/www/utils.py                 | 4 +---
 airflow/www/views.py                 | 2 +-
 tests/core.py                        | 3 +--
 tests/jobs.py                        | 9 ++++-----
 12 files changed, 20 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 1e40fe9..3daa6e2 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -24,19 +24,17 @@ from airflow import version
 __version__ = version.version
 
 import logging
-import os
 import sys
 
 from airflow import configuration as conf
-
+from airflow import settings
 from airflow.models import DAG
 from flask_admin import BaseView
 from importlib import import_module
 from airflow.exceptions import AirflowException
 
-DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
-if DAGS_FOLDER not in sys.path:
-    sys.path.append(DAGS_FOLDER)
+if settings.DAGS_FOLDER not in sys.path:
+    sys.path.append(settings.DAGS_FOLDER)
 
 login = None
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index fbd86db..61d8707 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -58,10 +58,8 @@ from airflow.www.app import cached_app
 from sqlalchemy import func
 from sqlalchemy.orm import exc
 
-DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
 
 api.load_auth()
-
 api_module = import_module(conf.get('cli', 'api_client'))
 api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
                                auth=api.api_auth.client_auth)
@@ -114,11 +112,8 @@ def setup_locations(process, pid=None, stdout=None, stderr=None, log=None):
 
 
 def process_subdir(subdir):
-    dags_folder = conf.get("core", "DAGS_FOLDER")
-    dags_folder = os.path.expanduser(dags_folder)
     if subdir:
-        if "DAGS_FOLDER" in subdir:
-            subdir = subdir.replace("DAGS_FOLDER", dags_folder)
+        subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
         subdir = os.path.abspath(os.path.expanduser(subdir))
         return subdir
 
@@ -1128,7 +1123,7 @@ class CLIFactory(object):
         'subdir': Arg(
             ("-sd", "--subdir"),
             "File location or directory from which to look for the dag",
-            default=DAGS_FOLDER),
+            default=settings.DAGS_FOLDER),
         'start_date': Arg(
             ("-s", "--start_date"), "Override start_date YYYY-MM-DD",
             type=parsedate),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 404808b..6752bdb 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -828,9 +828,3 @@ as_dict.__doc__ = conf.as_dict.__doc__
 
 def set(section, option, value):  # noqa
     return conf.set(section, option, value)
-
-########################
-# convenience method to access config entries
-
-def get_dags_folder():
-    return os.path.expanduser(get('core', 'DAGS_FOLDER'))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 1362814..4b96113 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -463,7 +463,7 @@ class SchedulerJob(BaseJob):
             self,
             dag_id=None,
             dag_ids=None,
-            subdir=models.DAGS_FOLDER,
+            subdir=settings.DAGS_FOLDER,
             num_runs=-1,
             file_process_interval=conf.getint('scheduler',
                                               'min_file_process_interval'),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/migrations/env.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index a107d6c..8d5e55e 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -17,7 +17,6 @@ from alembic import context
 from logging.config import fileConfig
 
 from airflow import settings
-from airflow import configuration
 from airflow.jobs import models
 
 # this is the Alembic Config object, which provides
@@ -54,10 +53,9 @@ def run_migrations_offline():
     script output.
 
     """
-    url = configuration.get('core', 'SQL_ALCHEMY_CONN')
     context.configure(
-        url=url, target_metadata=target_metadata, literal_binds=True,
-        compare_type=COMPARE_TYPE)
+        url=settings.SQL_ALCHEMY_CONN, target_metadata=target_metadata,
+        literal_binds=True, compare_type=COMPARE_TYPE)
 
     with context.begin_transaction():
         context.run_migrations()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index a4b30f5..6cf7ad9 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -81,8 +81,6 @@ from airflow.utils.trigger_rule import TriggerRule
 
 Base = declarative_base()
 ID_LEN = 250
-SQL_ALCHEMY_CONN = configuration.get('core', 'SQL_ALCHEMY_CONN')
-DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER'))
 XCOM_RETURN_KEY = 'return_value'
 
 Stats = settings.Stats
@@ -95,7 +93,7 @@ try:
 except:
     pass
 
-if 'mysql' in SQL_ALCHEMY_CONN:
+if 'mysql' in settings.SQL_ALCHEMY_CONN:
     LongText = LONGTEXT
 else:
     LongText = Text
@@ -165,7 +163,7 @@ class DagBag(BaseDagBag, LoggingMixin):
             executor=DEFAULT_EXECUTOR,
             include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES')):
 
-        dag_folder = dag_folder or DAGS_FOLDER
+        dag_folder = dag_folder or settings.DAGS_FOLDER
         self.logger.info("Filling up the DagBag from {}".format(dag_folder))
         self.dag_folder = dag_folder
         self.dags = {}
@@ -2856,7 +2854,7 @@ class DAG(BaseDag, LoggingMixin):
         """
         File location of where the dag object is instantiated
         """
-        fn = self.full_filepath.replace(DAGS_FOLDER + '/', '')
+        fn = self.full_filepath.replace(settings.DAGS_FOLDER + '/', '')
         fn = fn.replace(os.path.dirname(__file__) + '/', '')
         return fn
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 239ebb4..c3ffa1a 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -14,7 +14,6 @@
 
 from datetime import datetime
 import logging
-import os
 
 from airflow.models import BaseOperator, DagBag
 from airflow.utils.decorators import apply_defaults
@@ -65,7 +64,7 @@ class TriggerDagRunOperator(BaseOperator):
         dro = self.python_callable(context, dro)
         if dro:
             session = settings.Session()
-            dbag = DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
+            dbag = DagBag(settings.DAGS_FOLDER)
             trigger_dag = dbag.get_dag(self.trigger_dag_id)
             dr = trigger_dag.create_dagrun(
                 run_id=dro.run_id,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 9c7b4b3..2502219 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -30,7 +30,6 @@ from sqlalchemy import event, exc
 from sqlalchemy.pool import Pool
 
 from airflow import settings
-from airflow import configuration
 
 
 def provide_session(func):
@@ -287,8 +286,7 @@ def upgradedb():
     directory = os.path.join(package_dir, 'migrations')
     config = Config(os.path.join(package_dir, 'alembic.ini'))
     config.set_main_option('script_location', directory)
-    config.set_main_option('sqlalchemy.url',
-                           configuration.get('core', 'SQL_ALCHEMY_CONN'))
+    config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN)
     command.upgrade(config, 'heads')
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 1a1229b..d2218de 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -137,9 +137,7 @@ def notify_owner(f):
         if request.args.get('confirmed') == "true":
             dag_id = request.args.get('dag_id')
             task_id = request.args.get('task_id')
-            dagbag = models.DagBag(
-                os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')))
-
+            dagbag = models.DagBag(settings.DAGS_FOLDER)
             dag = dagbag.get_dag(dag_id)
             task = dag.get_task(task_id)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 85baeae..d7c46a7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -78,7 +78,7 @@ from airflow.configuration import AirflowConfigException
 QUERY_LIMIT = 100000
 CHART_LIMIT = 200000
 
-dagbag = models.DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
+dagbag = models.DagBag(settings.DAGS_FOLDER)
 
 login_required = airflow.login.login_required
 current_user = airflow.login.current_user

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 6d53aeb..0bf4052 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -15,7 +15,6 @@
 from __future__ import print_function
 
 import doctest
-import json
 import os
 import re
 import unittest
@@ -1315,7 +1314,7 @@ class CliTests(unittest.TestCase):
             '-s', DEFAULT_DATE.isoformat()]))
 
     def test_process_subdir_path_with_placeholder(self):
-        assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(configuration.get_dags_folder(), 'abc')
+        assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(settings.DAGS_FOLDER, 'abc')
 
     def test_trigger_dag(self):
         cli.trigger_dag(self.parser.parse_args([

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index ee4c8a7..44087e1 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -27,7 +27,6 @@ import sys
 from tempfile import mkdtemp
 
 from airflow import AirflowException, settings
-from airflow import models
 from airflow.bin import cli
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
@@ -817,7 +816,7 @@ class SchedulerJobTest(unittest.TestCase):
         # Recreated part of the scheduler here, to kick off tasks -> executor
         for ti_key in queue:
             task = dag.get_task(ti_key[1])
-            ti = models.TaskInstance(task, ti_key[2])
+            ti = TI(task, ti_key[2])
             # Task starts out in the scheduled state. All tasks in the
             # scheduled state will be sent to the executor
             ti.state = State.SCHEDULED
@@ -921,7 +920,7 @@ class SchedulerJobTest(unittest.TestCase):
             # try to schedule the above DAG repeatedly.
             scheduler = SchedulerJob(num_runs=1,
                                      executor=executor,
-                                     subdir=os.path.join(models.DAGS_FOLDER,
+                                     subdir=os.path.join(settings.DAGS_FOLDER,
                                                          "no_dags.py"))
             scheduler.heartrate = 0
             scheduler.run()
@@ -973,7 +972,7 @@ class SchedulerJobTest(unittest.TestCase):
             # try to schedule the above DAG repeatedly.
             scheduler = SchedulerJob(num_runs=1,
                                      executor=executor,
-                                     subdir=os.path.join(models.DAGS_FOLDER,
+                                     subdir=os.path.join(settings.DAGS_FOLDER,
                                                          "no_dags.py"))
             scheduler.heartrate = 0
             scheduler.run()
@@ -1066,7 +1065,7 @@ class SchedulerJobTest(unittest.TestCase):
 
         dag_id = 'exit_test_dag'
         dag_ids = [dag_id]
-        dag_directory = os.path.join(models.DAGS_FOLDER,
+        dag_directory = os.path.join(settings.DAGS_FOLDER,
                                      "..",
                                      "dags_with_system_exit")
         dag_file = os.path.join(dag_directory,