You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/02 13:46:45 UTC
incubator-airflow git commit: [AIRFLOW-794] Access DAGS_FOLDER and
SQL_ALCHEMY_CONN exclusively from settings
Repository: incubator-airflow
Updated Branches:
refs/heads/master 3e6b923f8 -> 20c7ccc9c
[AIRFLOW-794] Access DAGS_FOLDER and SQL_ALCHEMY_CONN exclusively from settings
Closes #2013 from gsakkis/settings
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/20c7ccc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/20c7ccc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/20c7ccc9
Branch: refs/heads/master
Commit: 20c7ccc9cc1105643c85ddf5953615ca481c79e2
Parents: 3e6b923
Author: George Sakkis <ge...@gmail.com>
Authored: Thu Feb 2 14:45:48 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Feb 2 14:46:15 2017 +0100
----------------------------------------------------------------------
airflow/__init__.py | 8 +++-----
airflow/bin/cli.py | 9 ++-------
airflow/configuration.py | 6 ------
airflow/jobs.py | 2 +-
airflow/migrations/env.py | 6 ++----
airflow/models.py | 8 +++-----
airflow/operators/dagrun_operator.py | 3 +--
airflow/utils/db.py | 4 +---
airflow/www/utils.py | 4 +---
airflow/www/views.py | 2 +-
tests/core.py | 3 +--
tests/jobs.py | 9 ++++-----
12 files changed, 20 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 1e40fe9..3daa6e2 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -24,19 +24,17 @@ from airflow import version
__version__ = version.version
import logging
-import os
import sys
from airflow import configuration as conf
-
+from airflow import settings
from airflow.models import DAG
from flask_admin import BaseView
from importlib import import_module
from airflow.exceptions import AirflowException
-DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
-if DAGS_FOLDER not in sys.path:
- sys.path.append(DAGS_FOLDER)
+if settings.DAGS_FOLDER not in sys.path:
+ sys.path.append(settings.DAGS_FOLDER)
login = None
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index fbd86db..61d8707 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -58,10 +58,8 @@ from airflow.www.app import cached_app
from sqlalchemy import func
from sqlalchemy.orm import exc
-DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
api.load_auth()
-
api_module = import_module(conf.get('cli', 'api_client'))
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
auth=api.api_auth.client_auth)
@@ -114,11 +112,8 @@ def setup_locations(process, pid=None, stdout=None, stderr=None, log=None):
def process_subdir(subdir):
- dags_folder = conf.get("core", "DAGS_FOLDER")
- dags_folder = os.path.expanduser(dags_folder)
if subdir:
- if "DAGS_FOLDER" in subdir:
- subdir = subdir.replace("DAGS_FOLDER", dags_folder)
+ subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
subdir = os.path.abspath(os.path.expanduser(subdir))
return subdir
@@ -1128,7 +1123,7 @@ class CLIFactory(object):
'subdir': Arg(
("-sd", "--subdir"),
"File location or directory from which to look for the dag",
- default=DAGS_FOLDER),
+ default=settings.DAGS_FOLDER),
'start_date': Arg(
("-s", "--start_date"), "Override start_date YYYY-MM-DD",
type=parsedate),
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 404808b..6752bdb 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -828,9 +828,3 @@ as_dict.__doc__ = conf.as_dict.__doc__
def set(section, option, value): # noqa
return conf.set(section, option, value)
-
-########################
-# convenience method to access config entries
-
-def get_dags_folder():
- return os.path.expanduser(get('core', 'DAGS_FOLDER'))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 1362814..4b96113 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -463,7 +463,7 @@ class SchedulerJob(BaseJob):
self,
dag_id=None,
dag_ids=None,
- subdir=models.DAGS_FOLDER,
+ subdir=settings.DAGS_FOLDER,
num_runs=-1,
file_process_interval=conf.getint('scheduler',
'min_file_process_interval'),
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/migrations/env.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index a107d6c..8d5e55e 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -17,7 +17,6 @@ from alembic import context
from logging.config import fileConfig
from airflow import settings
-from airflow import configuration
from airflow.jobs import models
# this is the Alembic Config object, which provides
@@ -54,10 +53,9 @@ def run_migrations_offline():
script output.
"""
- url = configuration.get('core', 'SQL_ALCHEMY_CONN')
context.configure(
- url=url, target_metadata=target_metadata, literal_binds=True,
- compare_type=COMPARE_TYPE)
+ url=settings.SQL_ALCHEMY_CONN, target_metadata=target_metadata,
+ literal_binds=True, compare_type=COMPARE_TYPE)
with context.begin_transaction():
context.run_migrations()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index a4b30f5..6cf7ad9 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -81,8 +81,6 @@ from airflow.utils.trigger_rule import TriggerRule
Base = declarative_base()
ID_LEN = 250
-SQL_ALCHEMY_CONN = configuration.get('core', 'SQL_ALCHEMY_CONN')
-DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER'))
XCOM_RETURN_KEY = 'return_value'
Stats = settings.Stats
@@ -95,7 +93,7 @@ try:
except:
pass
-if 'mysql' in SQL_ALCHEMY_CONN:
+if 'mysql' in settings.SQL_ALCHEMY_CONN:
LongText = LONGTEXT
else:
LongText = Text
@@ -165,7 +163,7 @@ class DagBag(BaseDagBag, LoggingMixin):
executor=DEFAULT_EXECUTOR,
include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES')):
- dag_folder = dag_folder or DAGS_FOLDER
+ dag_folder = dag_folder or settings.DAGS_FOLDER
self.logger.info("Filling up the DagBag from {}".format(dag_folder))
self.dag_folder = dag_folder
self.dags = {}
@@ -2856,7 +2854,7 @@ class DAG(BaseDag, LoggingMixin):
"""
File location of where the dag object is instantiated
"""
- fn = self.full_filepath.replace(DAGS_FOLDER + '/', '')
+ fn = self.full_filepath.replace(settings.DAGS_FOLDER + '/', '')
fn = fn.replace(os.path.dirname(__file__) + '/', '')
return fn
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 239ebb4..c3ffa1a 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -14,7 +14,6 @@
from datetime import datetime
import logging
-import os
from airflow.models import BaseOperator, DagBag
from airflow.utils.decorators import apply_defaults
@@ -65,7 +64,7 @@ class TriggerDagRunOperator(BaseOperator):
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
- dbag = DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
+ dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 9c7b4b3..2502219 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -30,7 +30,6 @@ from sqlalchemy import event, exc
from sqlalchemy.pool import Pool
from airflow import settings
-from airflow import configuration
def provide_session(func):
@@ -287,8 +286,7 @@ def upgradedb():
directory = os.path.join(package_dir, 'migrations')
config = Config(os.path.join(package_dir, 'alembic.ini'))
config.set_main_option('script_location', directory)
- config.set_main_option('sqlalchemy.url',
- configuration.get('core', 'SQL_ALCHEMY_CONN'))
+ config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN)
command.upgrade(config, 'heads')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 1a1229b..d2218de 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -137,9 +137,7 @@ def notify_owner(f):
if request.args.get('confirmed') == "true":
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
- dagbag = models.DagBag(
- os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')))
-
+ dagbag = models.DagBag(settings.DAGS_FOLDER)
dag = dagbag.get_dag(dag_id)
task = dag.get_task(task_id)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 85baeae..d7c46a7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -78,7 +78,7 @@ from airflow.configuration import AirflowConfigException
QUERY_LIMIT = 100000
CHART_LIMIT = 200000
-dagbag = models.DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
+dagbag = models.DagBag(settings.DAGS_FOLDER)
login_required = airflow.login.login_required
current_user = airflow.login.current_user
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 6d53aeb..0bf4052 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -15,7 +15,6 @@
from __future__ import print_function
import doctest
-import json
import os
import re
import unittest
@@ -1315,7 +1314,7 @@ class CliTests(unittest.TestCase):
'-s', DEFAULT_DATE.isoformat()]))
def test_process_subdir_path_with_placeholder(self):
- assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(configuration.get_dags_folder(), 'abc')
+ assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(settings.DAGS_FOLDER, 'abc')
def test_trigger_dag(self):
cli.trigger_dag(self.parser.parse_args([
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/20c7ccc9/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index ee4c8a7..44087e1 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -27,7 +27,6 @@ import sys
from tempfile import mkdtemp
from airflow import AirflowException, settings
-from airflow import models
from airflow.bin import cli
from airflow.jobs import BackfillJob, SchedulerJob
from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
@@ -817,7 +816,7 @@ class SchedulerJobTest(unittest.TestCase):
# Recreated part of the scheduler here, to kick off tasks -> executor
for ti_key in queue:
task = dag.get_task(ti_key[1])
- ti = models.TaskInstance(task, ti_key[2])
+ ti = TI(task, ti_key[2])
# Task starts out in the scheduled state. All tasks in the
# scheduled state will be sent to the executor
ti.state = State.SCHEDULED
@@ -921,7 +920,7 @@ class SchedulerJobTest(unittest.TestCase):
# try to schedule the above DAG repeatedly.
scheduler = SchedulerJob(num_runs=1,
executor=executor,
- subdir=os.path.join(models.DAGS_FOLDER,
+ subdir=os.path.join(settings.DAGS_FOLDER,
"no_dags.py"))
scheduler.heartrate = 0
scheduler.run()
@@ -973,7 +972,7 @@ class SchedulerJobTest(unittest.TestCase):
# try to schedule the above DAG repeatedly.
scheduler = SchedulerJob(num_runs=1,
executor=executor,
- subdir=os.path.join(models.DAGS_FOLDER,
+ subdir=os.path.join(settings.DAGS_FOLDER,
"no_dags.py"))
scheduler.heartrate = 0
scheduler.run()
@@ -1066,7 +1065,7 @@ class SchedulerJobTest(unittest.TestCase):
dag_id = 'exit_test_dag'
dag_ids = [dag_id]
- dag_directory = os.path.join(models.DAGS_FOLDER,
+ dag_directory = os.path.join(settings.DAGS_FOLDER,
"..",
"dags_with_system_exit")
dag_file = os.path.join(dag_directory,