You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 02:49:05 UTC

[01/28] incubator-airflow git commit: [AIRFLOW-365] Set dag.fileloc explicitly and use for Code view

Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 4db8f0796 -> 2a6089728


[AIRFLOW-365] Set dag.fileloc explicitly and use for Code view

Code view for subdag has not been working. I do
not think we are able
cleanly figure out where the code for the factory
method lives when we
process the dags, so we need to save the location
when the subdag is
created.

Previously for a subdag, its `fileloc` attribute
would be set to the
location of the parent dag. I think it is
appropriate to instead set
it to the actual child dag location instead. We do
not lose any
information this way (we still have the link to
the parent dag that
has its location) and now we can always read this
attribute for the
code view. This should not affect the use of this
field for refreshing
dags, because we always refresh the parent for a
subdag.

Closes #2043 from dhuang/AIRFLOW-365


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a7abcf35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a7abcf35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a7abcf35

Branch: refs/heads/v1-8-test
Commit: a7abcf35b0e228034f746b3d50abd0ca9bd8bede
Parents: 4db8f07
Author: Daniel Huang <dx...@gmail.com>
Authored: Thu Feb 2 13:57:20 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Mar 12 07:54:02 2017 -0700

----------------------------------------------------------------------
 airflow/models.py    |  7 ++++---
 airflow/www/views.py |  5 ++---
 tests/models.py      | 18 ++++++++++++++++++
 3 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7abcf35/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 62457f0..d6ab5b8 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -29,6 +29,7 @@ import functools
 import getpass
 import imp
 import importlib
+import inspect
 import zipfile
 import jinja2
 import json
@@ -307,7 +308,6 @@ class DagBag(BaseDagBag, LoggingMixin):
                     if not dag.full_filepath:
                         dag.full_filepath = filepath
                     dag.is_subdag = False
-                    dag.module_name = m.__name__
                     self.bag_dag(dag, parent_dag=dag, root_dag=dag)
                     found_dags.append(dag)
                     found_dags += dag.subdags
@@ -367,7 +367,6 @@ class DagBag(BaseDagBag, LoggingMixin):
         for subdag in dag.subdags:
             subdag.full_filepath = dag.full_filepath
             subdag.parent_dag = dag
-            subdag.fileloc = root_dag.full_filepath
             subdag.is_subdag = True
             self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
         self.logger.debug('Loaded DAG {dag}'.format(**locals()))
@@ -2660,6 +2659,8 @@ class DAG(BaseDag, LoggingMixin):
         self._pickle_id = None
 
         self._description = description
+        # set file location to caller source path
+        self.fileloc = inspect.getsourcefile(inspect.stack()[1][0])
         self.task_dict = dict()
         self.start_date = start_date
         self.end_date = end_date
@@ -3355,7 +3356,7 @@ class DAG(BaseDag, LoggingMixin):
             orm_dag = DagModel(dag_id=dag.dag_id)
             logging.info("Creating ORM DAG for %s",
                          dag.dag_id)
-        orm_dag.fileloc = dag.full_filepath
+        orm_dag.fileloc = dag.fileloc
         orm_dag.is_subdag = dag.is_subdag
         orm_dag.owners = owner
         orm_dag.is_active = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7abcf35/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b98bd74..9e68079 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -18,7 +18,6 @@ from past.builtins import basestring, unicode
 import os
 import pkg_resources
 import socket
-import importlib
 from functools import wraps
 from datetime import datetime, timedelta
 import dateutil.parser
@@ -577,8 +576,8 @@ class Airflow(BaseView):
         dag = dagbag.get_dag(dag_id)
         title = dag_id
         try:
-            m = importlib.import_module(dag.module_name)
-            code = inspect.getsource(m)
+            with open(dag.fileloc, 'r') as f:
+                code = f.read()
             html_code = highlight(
                 code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
         except IOError as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7abcf35/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 867e293..7ca01e7 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -200,6 +200,24 @@ class DagBagTest(unittest.TestCase):
         assert dagbag.get_dag(dag_id) != None
         assert dagbag.process_file_calls == 1
 
+    def test_get_dag_fileloc(self):
+        """
+        Test that fileloc is correctly set when we load example DAGs,
+        specifically SubDAGs.
+        """
+        dagbag = models.DagBag(include_examples=True)
+
+        expected = {
+            'example_bash_operator': 'example_bash_operator.py',
+            'example_subdag_operator': 'example_subdag_operator.py',
+            'example_subdag_operator.section-1': 'subdags/subdag.py'
+        }
+
+        for dag_id, path in expected.items():
+            dag = dagbag.get_dag(dag_id)
+            self.assertTrue(
+                dag.fileloc.endswith('airflow/example_dags/' + path))
+
 
 class TaskInstanceTest(unittest.TestCase):
 


[05/28] incubator-airflow git commit: [AIRFLOW-830][AIRFLOW-829][AIRFLOW-88] Reduce Travis log verbosity

Posted by bo...@apache.org.
[AIRFLOW-830][AIRFLOW-829][AIRFLOW-88] Reduce Travis log verbosity

[AIRFLOW-829][AIRFLOW-88] Reduce verbosity of
Travis tests

Remove the -s flag for Travis unit tests to
suppress output
from successful tests.

[AIRFLOW-830] Reduce plugins manager verbosity

The plugin manager prints all status to INFO,
which is unnecessary and
overly verbose.

Closes #2049 from jlowin/reduce-logs


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3b1e81ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3b1e81ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3b1e81ac

Branch: refs/heads/v1-8-test
Commit: 3b1e81ac9e8e97b6d2a4c3217df81db9ddbd0900
Parents: e1d0adb
Author: Jeremiah Lowin <jl...@apache.org>
Authored: Wed Feb 8 08:32:25 2017 -0500
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:03:25 2017 -0700

----------------------------------------------------------------------
 airflow/plugins_manager.py |  4 ++--
 run_unit_tests.sh          | 36 ++++++++++++++++++++++++------------
 2 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b1e81ac/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index e0af20c..83aae23 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -72,7 +72,7 @@ for root, dirs, files in os.walk(plugins_folder, followlinks=True):
             if file_ext != '.py':
                 continue
 
-            logging.info('Importing plugin module ' + filepath)
+            logging.debug('Importing plugin module ' + filepath)
             # normalize root path as namespace
             namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name])
 
@@ -92,7 +92,7 @@ for root, dirs, files in os.walk(plugins_folder, followlinks=True):
 
 
 def make_module(name, objects):
-    logging.info('Creating module ' + name)
+    logging.debug('Creating module ' + name)
     name = name.lower()
     module = imp.new_module(name)
     module._name = name.split('.')[-1]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b1e81ac/run_unit_tests.sh
----------------------------------------------------------------------
diff --git a/run_unit_tests.sh b/run_unit_tests.sh
index c922a55..b3ee074 100755
--- a/run_unit_tests.sh
+++ b/run_unit_tests.sh
@@ -28,17 +28,6 @@ export AIRFLOW_USE_NEW_IMPORTS=1
 # any argument received is overriding the default nose execution arguments:
 
 nose_args=$@
-if [ -z "$nose_args" ]; then
-  nose_args="--with-coverage \
---cover-erase \
---cover-html \
---cover-package=airflow \
---cover-html-dir=airflow/www/static/coverage \
---with-ignore-docstrings \
--s \
--v \
---logging-level=DEBUG "
-fi
 
 #--with-doctest
 
@@ -50,7 +39,18 @@ yes | airflow resetdb
 airflow initdb
 
 if [ "${TRAVIS}" ]; then
-  # For impersonation tests running on SQLite on Travis, make the database world readable so other 
+    if [ -z "$nose_args" ]; then
+      nose_args="--with-coverage \
+    --cover-erase \
+    --cover-html \
+    --cover-package=airflow \
+    --cover-html-dir=airflow/www/static/coverage \
+    --with-ignore-docstrings \
+    -v \
+    --logging-level=DEBUG "
+    fi
+
+  # For impersonation tests running on SQLite on Travis, make the database world readable so other
   # users can update it
   AIRFLOW_DB="/home/travis/airflow/airflow.db"
   if [ -f "${AIRFLOW_DB}" ]; then
@@ -60,6 +60,18 @@ if [ "${TRAVIS}" ]; then
   # For impersonation tests on Travis, make airflow accessible to other users via the global PATH
   # (which contains /usr/local/bin)
   sudo ln -s "${VIRTUAL_ENV}/bin/airflow" /usr/local/bin/
+else
+    if [ -z "$nose_args" ]; then
+      nose_args="--with-coverage \
+    --cover-erase \
+    --cover-html \
+    --cover-package=airflow \
+    --cover-html-dir=airflow/www/static/coverage \
+    --with-ignore-docstrings \
+    -s \
+    -v \
+    --logging-level=DEBUG "
+    fi
 fi
 
 echo "Starting the unit tests with the following nose arguments: "$nose_args


[08/28] incubator-airflow git commit: [AIRFLOW-861] make pickle_info endpoint be login_required

Posted by bo...@apache.org.
[AIRFLOW-861] make pickle_info endpoint be login_required

Testing Done:
- Unittests pass

Closes #2077 from saguziel/aguziel-fix-login-
required


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ff0fa00d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ff0fa00d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ff0fa00d

Branch: refs/heads/v1-8-test
Commit: ff0fa00d82bfebbe9b2b9ff957e4d77db0891e7f
Parents: 1017008
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Feb 17 11:45:45 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:14:26 2017 -0700

----------------------------------------------------------------------
 airflow/www/views.py | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ff0fa00d/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 0391775..bda4921 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -640,6 +640,7 @@ class Airflow(BaseView):
         return wwwutils.json_response(d)
 
     @expose('/pickle_info')
+    @login_required
     def pickle_info(self):
         d = {}
         dag_id = request.args.get('dag_id')


[22/28] incubator-airflow git commit: [AIRFLOW-900] Fixes bugs in LocalTaskJob for double run protection

Posted by bo...@apache.org.
[AIRFLOW-900] Fixes bugs in LocalTaskJob for double run protection

Right now, a second task instance being triggered
will cause
both itself and the original task to run because
the hostname
and pid fields are updated regardless if the task
is already running.
Also, pid field is not refreshed from db properly.
Also, we should
check against parent's pid.

Will be followed up by working tests.

Closes #2102 from saguziel/aguziel-fix-trigger-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1243ab16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1243ab16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1243ab16

Branch: refs/heads/v1-8-test
Commit: 1243ab16849ab9716b26aeba6a11ea3e9e9a81ca
Parents: a8f2c27
Author: Alex Guziel <al...@airbnb.com>
Authored: Sat Mar 11 10:54:39 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:34:45 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py                 | 41 ++++++++++++++-----------
 airflow/models.py               |  2 ++
 tests/core.py                   | 59 ++++++++++++++++++++++++++++++++++++
 tests/dags/sleep_forever_dag.py | 29 ++++++++++++++++++
 4 files changed, 113 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index c61b229..222d9ba 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2072,15 +2072,6 @@ class LocalTaskJob(BaseJob):
         try:
             self.task_runner.start()
 
-            ti = self.task_instance
-            session = settings.Session()
-            if self.task_runner.process:
-                ti.pid = self.task_runner.process.pid
-            ti.hostname = socket.getfqdn()
-            session.merge(ti)
-            session.commit()
-            session.close()
-
             last_heartbeat_time = time.time()
             heartbeat_time_limit = conf.getint('scheduler',
                                                'scheduler_zombie_task_threshold')
@@ -2120,6 +2111,18 @@ class LocalTaskJob(BaseJob):
         self.task_runner.terminate()
         self.task_runner.on_finish()
 
+    def _is_descendant_process(self, pid):
+        """Checks if pid is a descendant of the current process.
+
+        :param pid: process id to check
+        :type pid: int
+        :rtype: bool
+        """
+        try:
+            return psutil.Process(pid) in psutil.Process().children(recursive=True)
+        except psutil.NoSuchProcess:
+            return False
+
     @provide_session
     def heartbeat_callback(self, session=None):
         """Self destruct task if state has been moved away from running externally"""
@@ -2133,15 +2136,17 @@ class LocalTaskJob(BaseJob):
         if ti.state == State.RUNNING:
             self.was_running = True
             fqdn = socket.getfqdn()
-            if not (fqdn == ti.hostname and
-                    self.task_runner.process.pid == ti.pid):
-                logging.warning("Recorded hostname and pid of {ti.hostname} "
-                                "and {ti.pid} do not match this instance's "
-                                "which are {fqdn} and "
-                                "{self.task_runner.process.pid}. "
-                                "Taking the poison pill. So long."
-                                .format(**locals()))
-                raise AirflowException("Another worker/process is running this job")
+            if fqdn != ti.hostname:
+                logging.warning("The recorded hostname {ti.hostname} "
+                                "does not match this instance's hostname "
+                                "{fqdn}".format(**locals()))
+                raise AirflowException("Hostname of job runner does not match")
+            elif not self._is_descendant_process(ti.pid):
+                current_pid = os.getpid()
+                logging.warning("Recorded pid {ti.pid} is not a "
+                                "descendant of the current pid "
+                                "{current_pid}".format(**locals()))
+                raise AirflowException("PID of job runner does not match")
         elif (self.was_running
               and self.task_runner.return_code() is None
               and hasattr(self.task_runner, 'process')):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 32c52ac..7c6590f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -997,6 +997,7 @@ class TaskInstance(Base):
             self.end_date = ti.end_date
             self.try_number = ti.try_number
             self.hostname = ti.hostname
+            self.pid = ti.pid
         else:
             self.state = None
 
@@ -1320,6 +1321,7 @@ class TaskInstance(Base):
         if not test_mode:
             session.add(Log(State.RUNNING, self))
         self.state = State.RUNNING
+        self.pid = os.getpid()
         self.end_date = None
         if not test_mode:
             session.merge(self)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index ee7a738..636ad43 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -26,6 +26,7 @@ from datetime import datetime, time, timedelta
 from email.mime.multipart import MIMEMultipart
 from email.mime.application import MIMEApplication
 import signal
+from time import time as timetime
 from time import sleep
 import warnings
 
@@ -895,6 +896,64 @@ class CoreTest(unittest.TestCase):
                 trigger_rule="non_existant",
                 dag=self.dag)
 
+    def test_run_task_twice(self):
+        """If two copies of a TI run, the new one should die, and old should live"""
+        dagbag = models.DagBag(
+            dag_folder=TEST_DAG_FOLDER,
+            include_examples=False,
+        )
+        TI = models.TaskInstance
+        dag = dagbag.dags.get('sleep_forever_dag')
+        task = dag.task_dict.get('sleeps_forever')
+    
+        ti = TI(task=task, execution_date=DEFAULT_DATE)
+        job1 = jobs.LocalTaskJob(
+            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job2 = jobs.LocalTaskJob(
+            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+
+        p1 = multiprocessing.Process(target=job1.run)
+        p2 = multiprocessing.Process(target=job2.run)
+        try:
+            p1.start()
+            start_time = timetime()
+            sleep(5.0) # must wait for session to be created on p1
+            settings.engine.dispose()
+            session = settings.Session()
+            ti.refresh_from_db(session=session)
+            self.assertEqual(State.RUNNING, ti.state)
+            p1pid = ti.pid
+            settings.engine.dispose()
+            p2.start()
+            p2.join(5) # wait 5 seconds until termination
+            self.assertFalse(p2.is_alive())
+            self.assertTrue(p1.is_alive())
+
+            settings.engine.dispose()
+            session = settings.Session()
+            ti.refresh_from_db(session=session)
+            self.assertEqual(State.RUNNING, ti.state)
+            self.assertEqual(p1pid, ti.pid)
+
+            # check changing hostname kills task
+            ti.refresh_from_db(session=session, lock_for_update=True)
+            ti.hostname = 'nonexistenthostname'
+            session.merge(ti)
+            session.commit()
+
+            p1.join(5)
+            self.assertFalse(p1.is_alive())
+        finally:
+            try:
+                p1.terminate()
+            except AttributeError:
+                pass # process already terminated
+            try:
+                p2.terminate()
+            except AttributeError:
+                pass # process already terminated
+            session.close()
+
     def test_terminate_task(self):
         """If a task instance's db state get deleted, it should fail"""
         TI = models.TaskInstance

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1243ab16/tests/dags/sleep_forever_dag.py
----------------------------------------------------------------------
diff --git a/tests/dags/sleep_forever_dag.py b/tests/dags/sleep_forever_dag.py
new file mode 100644
index 0000000..b1f810e
--- /dev/null
+++ b/tests/dags/sleep_forever_dag.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Used for unit tests"""
+import airflow
+from airflow.operators.bash_operator import BashOperator
+from airflow.models import DAG
+
+dag = DAG(
+    dag_id='sleep_forever_dag',
+    schedule_interval=None,
+)
+
+task = BashOperator(
+    task_id='sleeps_forever',
+    dag=dag,
+    bash_command="sleep 10000000000",
+    start_date=airflow.utils.dates.days_ago(2),
+    owner='airflow')


[28/28] incubator-airflow git commit: Update changelog for 1.8.0

Posted by bo...@apache.org.
Update changelog for 1.8.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2a608972
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2a608972
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2a608972

Branch: refs/heads/v1-8-test
Commit: 2a6089728841e1f4bb060345b5c251b3ff73d13d
Parents: f171d17
Author: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Authored: Sun Mar 12 19:48:04 2017 -0700
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 19:48:04 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.txt | 35 ++++++++++++++++++++++++++++++++++-
 1 file changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2a608972/CHANGELOG.txt
----------------------------------------------------------------------
diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index 8da887c..5048128 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -1,6 +1,39 @@
-AIRFLOW 1.8.0, 2017-02-02
+AIRFLOW 1.8.0, 2017-03-12
 -------------------------
 
+[AIRFLOW-900] Double trigger should not kill original task instance
+[AIRFLOW-900] Fixes bugs in LocalTaskJob for double run protection
+[AIRFLOW-932] Do not mark tasks removed when backfilling
+[AIRFLOW-961] run onkill when SIGTERMed
+[AIRFLOW-910] Use parallel task execution for backfills
+[AIRFLOW-967] Wrap strings in native for py2 ldap compatibility
+[AIRFLOW-941] Use defined parameters for psycopg2
+[AIRFLOW-719] Prevent DAGs from ending prematurely
+[AIRFLOW-938] Use test for True in task_stats queries
+[AIRFLOW-937] Improve performance of task_stats
+[AIRFLOW-933] use ast.literal_eval rather eval because ast.literal_eval does not execute input.
+[AIRFLOW-925] Revert airflow.hooks change that cherry-pick picked
+[AIRFLOW-919] Running tasks with no start date shouldn't break a DAGs UI
+[AIRFLOW-802] Add spark-submit operator/hook
+[AIRFLOW-897] Prevent dagruns from failing with unfinished tasks
+[AIRFLOW-861] make pickle_info endpoint be login_required
+[AIRFLOW-853] use utf8 encoding for stdout line decode
+[AIRFLOW-856] Make sure execution date is set for local client
+[AIRFLOW-830][AIRFLOW-829][AIRFLOW-88] Reduce Travis log verbosity
+[AIRFLOW-831] Restore import to fix broken tests
+[AIRFLOW-794] Access DAGS_FOLDER and SQL_ALCHEMY_CONN exclusively from settings
+[AIRFLOW-694] Fix config behaviour for empty envvar
+[AIRFLOW-365] Set dag.fileloc explicitly and use for Code view
+[AIRFLOW-931] Do not set QUEUED in TaskInstances
+[AIRFLOW-899] Tasks in SCHEDULED state should be white in the UI instead of black
+[AIRFLOW-895] Address Apache release incompliancies
+[AIRFLOW-893][AIRFLOW-510] Fix crashing webservers when a dagrun has no start date
+[AIRFLOW-793] Enable compressed loading in S3ToHiveTransfer
+[AIRFLOW-863] Example DAGs should have recent start dates
+[AIRFLOW-869] Refactor mark success functionality
+[AIRFLOW-856] Make sure execution date is set for local client
+[AIRFLOW-814] Fix Presto*CheckOperator.__init__
+[AIRFLOW-844] Fix cgroups directory creation
 [AIRFLOW-816] Use static nvd3 and d3
 [AIRFLOW-821] Fix py3 compatibility
 [AIRFLOW-817] Check for None value of execution_date in endpoint


[03/28] incubator-airflow git commit: [AIRFLOW-794] Access DAGS_FOLDER and SQL_ALCHEMY_CONN exclusively from settings

Posted by bo...@apache.org.
[AIRFLOW-794] Access DAGS_FOLDER and SQL_ALCHEMY_CONN exclusively from settings

Closes #2013 from gsakkis/settings


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/25920242
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/25920242
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/25920242

Branch: refs/heads/v1-8-test
Commit: 2592024230a25820d368ecc3bd43fbf7b52e46d9
Parents: 5405f5f
Author: George Sakkis <ge...@gmail.com>
Authored: Thu Feb 2 14:45:48 2017 +0100
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 07:57:04 2017 -0700

----------------------------------------------------------------------
 airflow/__init__.py                  | 8 +++-----
 airflow/bin/cli.py                   | 9 ++-------
 airflow/configuration.py             | 6 ------
 airflow/jobs.py                      | 2 +-
 airflow/migrations/env.py            | 6 ++----
 airflow/models.py                    | 8 +++-----
 airflow/operators/dagrun_operator.py | 3 +--
 airflow/utils/db.py                  | 4 +---
 airflow/www/utils.py                 | 4 +---
 airflow/www/views.py                 | 2 +-
 tests/core.py                        | 3 +--
 tests/jobs.py                        | 9 ++++-----
 12 files changed, 20 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 1e40fe9..3daa6e2 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -24,19 +24,17 @@ from airflow import version
 __version__ = version.version
 
 import logging
-import os
 import sys
 
 from airflow import configuration as conf
-
+from airflow import settings
 from airflow.models import DAG
 from flask_admin import BaseView
 from importlib import import_module
 from airflow.exceptions import AirflowException
 
-DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
-if DAGS_FOLDER not in sys.path:
-    sys.path.append(DAGS_FOLDER)
+if settings.DAGS_FOLDER not in sys.path:
+    sys.path.append(settings.DAGS_FOLDER)
 
 login = None
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index fbd86db..61d8707 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -58,10 +58,8 @@ from airflow.www.app import cached_app
 from sqlalchemy import func
 from sqlalchemy.orm import exc
 
-DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
 
 api.load_auth()
-
 api_module = import_module(conf.get('cli', 'api_client'))
 api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
                                auth=api.api_auth.client_auth)
@@ -114,11 +112,8 @@ def setup_locations(process, pid=None, stdout=None, stderr=None, log=None):
 
 
 def process_subdir(subdir):
-    dags_folder = conf.get("core", "DAGS_FOLDER")
-    dags_folder = os.path.expanduser(dags_folder)
     if subdir:
-        if "DAGS_FOLDER" in subdir:
-            subdir = subdir.replace("DAGS_FOLDER", dags_folder)
+        subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
         subdir = os.path.abspath(os.path.expanduser(subdir))
         return subdir
 
@@ -1128,7 +1123,7 @@ class CLIFactory(object):
         'subdir': Arg(
             ("-sd", "--subdir"),
             "File location or directory from which to look for the dag",
-            default=DAGS_FOLDER),
+            default=settings.DAGS_FOLDER),
         'start_date': Arg(
             ("-s", "--start_date"), "Override start_date YYYY-MM-DD",
             type=parsedate),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 404808b..6752bdb 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -828,9 +828,3 @@ as_dict.__doc__ = conf.as_dict.__doc__
 
 def set(section, option, value):  # noqa
     return conf.set(section, option, value)
-
-########################
-# convenience method to access config entries
-
-def get_dags_folder():
-    return os.path.expanduser(get('core', 'DAGS_FOLDER'))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 3ca0070..fedad55 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -463,7 +463,7 @@ class SchedulerJob(BaseJob):
             self,
             dag_id=None,
             dag_ids=None,
-            subdir=models.DAGS_FOLDER,
+            subdir=settings.DAGS_FOLDER,
             num_runs=-1,
             file_process_interval=conf.getint('scheduler',
                                               'min_file_process_interval'),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/migrations/env.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index a107d6c..8d5e55e 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -17,7 +17,6 @@ from alembic import context
 from logging.config import fileConfig
 
 from airflow import settings
-from airflow import configuration
 from airflow.jobs import models
 
 # this is the Alembic Config object, which provides
@@ -54,10 +53,9 @@ def run_migrations_offline():
     script output.
 
     """
-    url = configuration.get('core', 'SQL_ALCHEMY_CONN')
     context.configure(
-        url=url, target_metadata=target_metadata, literal_binds=True,
-        compare_type=COMPARE_TYPE)
+        url=settings.SQL_ALCHEMY_CONN, target_metadata=target_metadata,
+        literal_binds=True, compare_type=COMPARE_TYPE)
 
     with context.begin_transaction():
         context.run_migrations()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index d6ab5b8..1829ff3 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -81,8 +81,6 @@ from airflow.utils.trigger_rule import TriggerRule
 
 Base = declarative_base()
 ID_LEN = 250
-SQL_ALCHEMY_CONN = configuration.get('core', 'SQL_ALCHEMY_CONN')
-DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER'))
 XCOM_RETURN_KEY = 'return_value'
 
 Stats = settings.Stats
@@ -95,7 +93,7 @@ try:
 except:
     pass
 
-if 'mysql' in SQL_ALCHEMY_CONN:
+if 'mysql' in settings.SQL_ALCHEMY_CONN:
     LongText = LONGTEXT
 else:
     LongText = Text
@@ -165,7 +163,7 @@ class DagBag(BaseDagBag, LoggingMixin):
             executor=DEFAULT_EXECUTOR,
             include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES')):
 
-        dag_folder = dag_folder or DAGS_FOLDER
+        dag_folder = dag_folder or settings.DAGS_FOLDER
         self.logger.info("Filling up the DagBag from {}".format(dag_folder))
         self.dag_folder = dag_folder
         self.dags = {}
@@ -2858,7 +2856,7 @@ class DAG(BaseDag, LoggingMixin):
         """
         File location of where the dag object is instantiated
         """
-        fn = self.full_filepath.replace(DAGS_FOLDER + '/', '')
+        fn = self.full_filepath.replace(settings.DAGS_FOLDER + '/', '')
         fn = fn.replace(os.path.dirname(__file__) + '/', '')
         return fn
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 239ebb4..c3ffa1a 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -14,7 +14,6 @@
 
 from datetime import datetime
 import logging
-import os
 
 from airflow.models import BaseOperator, DagBag
 from airflow.utils.decorators import apply_defaults
@@ -65,7 +64,7 @@ class TriggerDagRunOperator(BaseOperator):
         dro = self.python_callable(context, dro)
         if dro:
             session = settings.Session()
-            dbag = DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
+            dbag = DagBag(settings.DAGS_FOLDER)
             trigger_dag = dbag.get_dag(self.trigger_dag_id)
             dr = trigger_dag.create_dagrun(
                 run_id=dro.run_id,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 9c7b4b3..2502219 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -30,7 +30,6 @@ from sqlalchemy import event, exc
 from sqlalchemy.pool import Pool
 
 from airflow import settings
-from airflow import configuration
 
 
 def provide_session(func):
@@ -287,8 +286,7 @@ def upgradedb():
     directory = os.path.join(package_dir, 'migrations')
     config = Config(os.path.join(package_dir, 'alembic.ini'))
     config.set_main_option('script_location', directory)
-    config.set_main_option('sqlalchemy.url',
-                           configuration.get('core', 'SQL_ALCHEMY_CONN'))
+    config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN)
     command.upgrade(config, 'heads')
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 1a1229b..d2218de 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -137,9 +137,7 @@ def notify_owner(f):
         if request.args.get('confirmed') == "true":
             dag_id = request.args.get('dag_id')
             task_id = request.args.get('task_id')
-            dagbag = models.DagBag(
-                os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')))
-
+            dagbag = models.DagBag(settings.DAGS_FOLDER)
             dag = dagbag.get_dag(dag_id)
             task = dag.get_task(task_id)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 9e68079..0391775 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -77,7 +77,7 @@ from airflow.configuration import AirflowConfigException
 QUERY_LIMIT = 100000
 CHART_LIMIT = 200000
 
-dagbag = models.DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
+dagbag = models.DagBag(settings.DAGS_FOLDER)
 
 login_required = airflow.login.login_required
 current_user = airflow.login.current_user

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 3e76e81..ee7a738 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -15,7 +15,6 @@
 from __future__ import print_function
 
 import doctest
-import json
 import os
 import re
 import unittest
@@ -1315,7 +1314,7 @@ class CliTests(unittest.TestCase):
             '-s', DEFAULT_DATE.isoformat()]))
 
     def test_process_subdir_path_with_placeholder(self):
-        assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(configuration.get_dags_folder(), 'abc')
+        assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(settings.DAGS_FOLDER, 'abc')
 
     def test_trigger_dag(self):
         cli.trigger_dag(self.parser.parse_args([

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/25920242/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index ee4c8a7..44087e1 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -27,7 +27,6 @@ import sys
 from tempfile import mkdtemp
 
 from airflow import AirflowException, settings
-from airflow import models
 from airflow.bin import cli
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
@@ -817,7 +816,7 @@ class SchedulerJobTest(unittest.TestCase):
         # Recreated part of the scheduler here, to kick off tasks -> executor
         for ti_key in queue:
             task = dag.get_task(ti_key[1])
-            ti = models.TaskInstance(task, ti_key[2])
+            ti = TI(task, ti_key[2])
             # Task starts out in the scheduled state. All tasks in the
             # scheduled state will be sent to the executor
             ti.state = State.SCHEDULED
@@ -921,7 +920,7 @@ class SchedulerJobTest(unittest.TestCase):
             # try to schedule the above DAG repeatedly.
             scheduler = SchedulerJob(num_runs=1,
                                      executor=executor,
-                                     subdir=os.path.join(models.DAGS_FOLDER,
+                                     subdir=os.path.join(settings.DAGS_FOLDER,
                                                          "no_dags.py"))
             scheduler.heartrate = 0
             scheduler.run()
@@ -973,7 +972,7 @@ class SchedulerJobTest(unittest.TestCase):
             # try to schedule the above DAG repeatedly.
             scheduler = SchedulerJob(num_runs=1,
                                      executor=executor,
-                                     subdir=os.path.join(models.DAGS_FOLDER,
+                                     subdir=os.path.join(settings.DAGS_FOLDER,
                                                          "no_dags.py"))
             scheduler.heartrate = 0
             scheduler.run()
@@ -1066,7 +1065,7 @@ class SchedulerJobTest(unittest.TestCase):
 
         dag_id = 'exit_test_dag'
         dag_ids = [dag_id]
-        dag_directory = os.path.join(models.DAGS_FOLDER,
+        dag_directory = os.path.join(settings.DAGS_FOLDER,
                                      "..",
                                      "dags_with_system_exit")
         dag_file = os.path.join(dag_directory,


[07/28] incubator-airflow git commit: [AIRFLOW-853] use utf8 encoding for stdout line decode

Posted by bo...@apache.org.
[AIRFLOW-853] use utf8 encoding for stdout line decode

Closes #2060 from ming-wu/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/10170085
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/10170085
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/10170085

Branch: refs/heads/v1-8-test
Commit: 101700853896fdb90cda4267b5310e6c8811f4f0
Parents: 3918e5e
Author: Ming Wu <mi...@ubisoft.com>
Authored: Fri Feb 10 19:47:47 2017 -0500
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:10:12 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/ssh_execute_operator.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/10170085/airflow/contrib/operators/ssh_execute_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_execute_operator.py b/airflow/contrib/operators/ssh_execute_operator.py
index dd4c3b4..dd9e197 100644
--- a/airflow/contrib/operators/ssh_execute_operator.py
+++ b/airflow/contrib/operators/ssh_execute_operator.py
@@ -142,7 +142,7 @@ class SSHExecuteOperator(BaseOperator):
             logging.info("Output:")
             line = ''
             for line in iter(sp.stdout.readline, b''):
-                line = line.decode().strip()
+                line = line.decode('utf_8').strip()
                 logging.info(line)
             sp.wait()
             logging.info("Command exited with "


[04/28] incubator-airflow git commit: [AIRFLOW-831] Restore import to fix broken tests

Posted by bo...@apache.org.
[AIRFLOW-831] Restore import to fix broken tests

The global `models` object is used in the code and
was inadvertently
removed. This PR restores it

Closes #2050 from jlowin/fix-broken-tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e1d0adb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e1d0adb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e1d0adb6

Branch: refs/heads/v1-8-test
Commit: e1d0adb61d6475154ada7347ea30404f0680e779
Parents: 2592024
Author: Jeremiah Lowin <jl...@apache.org>
Authored: Thu Feb 2 11:56:22 2017 -0500
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 07:58:12 2017 -0700

----------------------------------------------------------------------
 tests/jobs.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e1d0adb6/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 44087e1..e520b44 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -26,7 +26,7 @@ import six
 import sys
 from tempfile import mkdtemp
 
-from airflow import AirflowException, settings
+from airflow import AirflowException, settings, models
 from airflow.bin import cli
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI


[10/28] incubator-airflow git commit: [AIRFLOW-802][AIRFLOW-1] Add spark-submit operator/hook

Posted by bo...@apache.org.
[AIRFLOW-802][AIRFLOW-1] Add spark-submit operator/hook

Add a operator for spark-submit to kick off Apache
Spark jobs by
using Airflow. This allows the user to maintain
the configuration
of the master and yarn queue within Airflow by
using connections.
Add default connection_id to the initdb routine to
set spark
to yarn by default. Add unit tests to verify the
behaviour of
the spark-submit operator and hook.

Closes #2042 from Fokko/airflow-802


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/01494fd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/01494fd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/01494fd4

Branch: refs/heads/v1-8-test
Commit: 01494fd4c0633dbb57f231ee17e015f42a5ecf24
Parents: c29af46
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Mon Feb 27 13:45:24 2017 +0100
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:19:37 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py               |   1 +
 airflow/contrib/hooks/spark_submit_hook.py      | 226 +++++++++++++++++++
 airflow/contrib/operators/__init__.py           |   1 +
 .../contrib/operators/spark_submit_operator.py  | 112 +++++++++
 airflow/utils/db.py                             |   4 +
 tests/contrib/hooks/spark_submit_hook.py        | 148 ++++++++++++
 .../contrib/operators/spark_submit_operator.py  |  75 ++++++
 7 files changed, 567 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/01494fd4/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index a16a3f7..19fc2b4 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -42,6 +42,7 @@ _hooks = {
     'datastore_hook': ['DatastoreHook'],
     'gcp_dataproc_hook': ['DataProcHook'],
     'gcp_dataflow_hook': ['DataFlowHook'],
+    'spark_submit_operator': ['SparkSubmitOperator'],
     'cloudant_hook': ['CloudantHook'],
     'fs_hook': ['FSHook']
 }

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/01494fd4/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
new file mode 100644
index 0000000..619cc71
--- /dev/null
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -0,0 +1,226 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import subprocess
+import re
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowException
+
+log = logging.getLogger(__name__)
+
+
+class SparkSubmitHook(BaseHook):
+    """
+    This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
+    It requires that the "spark-submit" binary is in the PATH.
+    :param conf: Arbitrary Spark configuration properties
+    :type conf: dict
+    :param conn_id: The connection id as configured in Airflow administration. When an
+                    invalid connection_id is supplied, it will default to yarn.
+    :type conn_id: str
+    :param files: Upload additional files to the container running the job, separated by a
+                  comma. For example hive-site.xml.
+    :type files: str
+    :param py_files: Additional python files used by the job, can be .zip, .egg or .py.
+    :type py_files: str
+    :param jars: Submit additional jars to upload and place them in executor classpath.
+    :type jars: str
+    :param executor_cores: Number of cores per executor (Default: 2)
+    :type executor_cores: int
+    :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+    :type executor_memory: str
+    :param keytab: Full path to the file that contains the keytab
+    :type keytab: str
+    :param principal: The name of the kerberos principal used for keytab
+    :type principal: str
+    :param name: Name of the job (default airflow-spark)
+    :type name: str
+    :param num_executors: Number of executors to launch
+    :type num_executors: int
+    :param verbose: Whether to pass the verbose flag to spark-submit process for debugging
+    :type verbose: bool
+    """
+
+    def __init__(self,
+                 conf=None,
+                 conn_id='spark_default',
+                 files=None,
+                 py_files=None,
+                 jars=None,
+                 executor_cores=None,
+                 executor_memory=None,
+                 keytab=None,
+                 principal=None,
+                 name='default-name',
+                 num_executors=None,
+                 verbose=False):
+        self._conf = conf
+        self._conn_id = conn_id
+        self._files = files
+        self._py_files = py_files
+        self._jars = jars
+        self._executor_cores = executor_cores
+        self._executor_memory = executor_memory
+        self._keytab = keytab
+        self._principal = principal
+        self._name = name
+        self._num_executors = num_executors
+        self._verbose = verbose
+        self._sp = None
+        self._yarn_application_id = None
+
+        (self._master, self._queue, self._deploy_mode) = self._resolve_connection()
+        self._is_yarn = 'yarn' in self._master
+
+    def _resolve_connection(self):
+        # Build from connection master or default to yarn if not available
+        master = 'yarn'
+        queue = None
+        deploy_mode = None
+
+        try:
+            # Master can be local, yarn, spark://HOST:PORT or mesos://HOST:PORT
+            conn = self.get_connection(self._conn_id)
+            if conn.port:
+                master = "{}:{}".format(conn.host, conn.port)
+            else:
+                master = conn.host
+
+            # Determine optional yarn queue from the extra field
+            extra = conn.extra_dejson
+            if 'queue' in extra:
+                queue = extra['queue']
+            if 'deploy-mode' in extra:
+                deploy_mode = extra['deploy-mode']
+        except AirflowException:
+            logging.debug(
+                "Could not load connection string {}, defaulting to {}".format(
+                    self._conn_id, master
+                )
+            )
+
+        return master, queue, deploy_mode
+
+    def get_conn(self):
+        pass
+
+    def _build_command(self, application):
+        """
+        Construct the spark-submit command to execute.
+        :param application: command to append to the spark-submit command
+        :type application: str
+        :return: full command to be executed
+        """
+        # The spark-submit binary needs to be in the path
+        connection_cmd = ["spark-submit"]
+
+        # The url ot the spark master
+        connection_cmd += ["--master", self._master]
+
+        if self._conf:
+            for key in self._conf:
+                connection_cmd += ["--conf", "{}={}".format(key, str(self._conf[key]))]
+        if self._files:
+            connection_cmd += ["--files", self._files]
+        if self._py_files:
+            connection_cmd += ["--py-files", self._py_files]
+        if self._jars:
+            connection_cmd += ["--jars", self._jars]
+        if self._num_executors:
+            connection_cmd += ["--num-executors", str(self._num_executors)]
+        if self._executor_cores:
+            connection_cmd += ["--executor-cores", str(self._executor_cores)]
+        if self._executor_memory:
+            connection_cmd += ["--executor-memory", self._executor_memory]
+        if self._keytab:
+            connection_cmd += ["--keytab", self._keytab]
+        if self._principal:
+            connection_cmd += ["--principal", self._principal]
+        if self._name:
+            connection_cmd += ["--name", self._name]
+        if self._verbose:
+            connection_cmd += ["--verbose"]
+        if self._queue:
+            connection_cmd += ["--queue", self._queue]
+        if self._deploy_mode:
+            connection_cmd += ["--deploy-mode", self._deploy_mode]
+
+        # The actual script to execute
+        connection_cmd += [application]
+
+        logging.debug("Spark-Submit cmd: {}".format(connection_cmd))
+
+        return connection_cmd
+
+    def submit(self, application="", **kwargs):
+        """
+        Remote Popen to execute the spark-submit job
+
+        :param application: Submitted application, jar or py file
+        :type application: str
+        :param kwargs: extra arguments to Popen (see subprocess.Popen)
+        """
+        spark_submit_cmd = self._build_command(application)
+        self._sp = subprocess.Popen(spark_submit_cmd,
+                                    stdout=subprocess.PIPE,
+                                    stderr=subprocess.PIPE,
+                                    **kwargs)
+
+        # Using two iterators here to support 'real-time' logging
+        sources = [self._sp.stdout, self._sp.stderr]
+
+        for source in sources:
+            self._process_log(iter(source.readline, b''))
+
+        output, stderr = self._sp.communicate()
+
+        if self._sp.returncode:
+            raise AirflowException(
+                "Cannot execute: {}. Error code is: {}. Output: {}, Stderr: {}".format(
+                    spark_submit_cmd, self._sp.returncode, output, stderr
+                )
+            )
+
+    def _process_log(self, itr):
+        """
+        Processes the log files and extracts useful information out of it
+
+        :param itr: An iterator which iterates over the input of the subprocess
+        """
+        # Consume the iterator
+        for line in itr:
+            line = line.decode('utf-8').strip()
+            # If we run yarn cluster mode, we want to extract the application id from
+            # the logs so we can kill the application when we stop it unexpectedly
+            if self._is_yarn and self._deploy_mode == 'cluster':
+                match = re.search('(application[0-9_]+)', line)
+                if match:
+                    self._yarn_application_id = match.groups()[0]
+
+            # Pass to logging
+            logging.info(line)
+
+    def on_kill(self):
+        if self._sp and self._sp.poll() is None:
+            logging.info('Sending kill signal to spark-submit')
+            self.sp.kill()
+
+            if self._yarn_application_id:
+                logging.info('Killing application on YARN')
+                yarn_kill = Popen("yarn application -kill {0}".format(self._yarn_application_id),
+                                  stdout=subprocess.PIPE,
+                                  stderr=subprocess.PIPE)
+                logging.info("YARN killed with return code: {0}".format(yarn_kill.wait()))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/01494fd4/airflow/contrib/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py
index ae481ea..bef3433 100644
--- a/airflow/contrib/operators/__init__.py
+++ b/airflow/contrib/operators/__init__.py
@@ -36,6 +36,7 @@ _operators = {
     'vertica_operator': ['VerticaOperator'],
     'vertica_to_hive': ['VerticaToHiveTransfer'],
     'qubole_operator': ['QuboleOperator'],
+    'spark_submit_operator': ['SparkSubmitOperator'],
     'fs_operator': ['FileSensor']
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/01494fd4/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py
new file mode 100644
index 0000000..a5e6145
--- /dev/null
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+
+from airflow.contrib.hooks.spark_submit_hook import SparkSubmitHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+log = logging.getLogger(__name__)
+
+
+class SparkSubmitOperator(BaseOperator):
+    """
+    This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
+    It requires that the "spark-submit" binary is in the PATH.
+    :param application: The application that submitted as a job, either jar or py file.
+    :type application: str
+    :param conf: Arbitrary Spark configuration properties
+    :type conf: dict
+    :param conn_id: The connection id as configured in Airflow administration. When an
+                    invalid connection_id is supplied, it will default to yarn.
+    :type conn_id: str
+    :param files: Upload additional files to the container running the job, separated by a
+                  comma. For example hive-site.xml.
+    :type files: str
+    :param py_files: Additional python files used by the job, can be .zip, .egg or .py.
+    :type py_files: str
+    :param jars: Submit additional jars to upload and place them in executor classpath.
+    :type jars: str
+    :param executor_cores: Number of cores per executor (Default: 2)
+    :type executor_cores: int
+    :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
+    :type executor_memory: str
+    :param keytab: Full path to the file that contains the keytab
+    :type keytab: str
+    :param principal: The name of the kerberos principal used for keytab
+    :type principal: str
+    :param name: Name of the job (default airflow-spark)
+    :type name: str
+    :param num_executors: Number of executors to launch
+    :type num_executors: int
+    :param verbose: Whether to pass the verbose flag to spark-submit process for debugging
+    :type verbose: bool
+    """
+
+    @apply_defaults
+    def __init__(self,
+                 application='',
+                 conf=None,
+                 conn_id='spark_default',
+                 files=None,
+                 py_files=None,
+                 jars=None,
+                 executor_cores=None,
+                 executor_memory=None,
+                 keytab=None,
+                 principal=None,
+                 name='airflow-spark',
+                 num_executors=None,
+                 verbose=False,
+                 *args,
+                 **kwargs):
+        super(SparkSubmitOperator, self).__init__(*args, **kwargs)
+        self._application = application
+        self._conf = conf
+        self._files = files
+        self._py_files = py_files
+        self._jars = jars
+        self._executor_cores = executor_cores
+        self._executor_memory = executor_memory
+        self._keytab = keytab
+        self._principal = principal
+        self._name = name
+        self._num_executors = num_executors
+        self._verbose = verbose
+        self._hook = None
+        self._conn_id = conn_id
+
+    def execute(self, context):
+        """
+        Call the SparkSubmitHook to run the provided spark job
+        """
+        self._hook = SparkSubmitHook(
+            conf=self._conf,
+            conn_id=self._conn_id,
+            files=self._files,
+            py_files=self._py_files,
+            jars=self._jars,
+            executor_cores=self._executor_cores,
+            executor_memory=self._executor_memory,
+            keytab=self._keytab,
+            principal=self._principal,
+            name=self._name,
+            num_executors=self._num_executors,
+            verbose=self._verbose
+        )
+        self._hook.submit(self._application)
+
+    def on_kill(self):
+        self._hook.on_kill()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/01494fd4/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 2502219..977a949 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -192,6 +192,10 @@ def initdb():
             extra='{"region_name": "us-east-1"}'))
     merge_conn(
         models.Connection(
+            conn_id='spark_default', conn_type='spark',
+            host='yarn', extra='{"queue": "root.default"}'))
+    merge_conn(
+        models.Connection(
             conn_id='emr_default', conn_type='emr',
             extra='''
                 {   "Name": "default_job_flow_name",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/01494fd4/tests/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/spark_submit_hook.py b/tests/contrib/hooks/spark_submit_hook.py
new file mode 100644
index 0000000..b18925a
--- /dev/null
+++ b/tests/contrib/hooks/spark_submit_hook.py
@@ -0,0 +1,148 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from airflow import configuration, models
+from airflow.utils import db
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.spark_submit_hook import SparkSubmitHook
+
+
+class TestSparkSubmitHook(unittest.TestCase):
+    _spark_job_file = 'test_application.py'
+    _config = {
+        'conf': {
+            'parquet.compression': 'SNAPPY'
+        },
+        'conn_id': 'default_spark',
+        'files': 'hive-site.xml',
+        'py_files': 'sample_library.py',
+        'jars': 'parquet.jar',
+        'executor_cores': 4,
+        'executor_memory': '22g',
+        'keytab': 'privileged_user.keytab',
+        'principal': 'user/spark@airflow.org',
+        'name': 'spark-job',
+        'num_executors': 10,
+        'verbose': True
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='spark_yarn_cluster', conn_type='spark',
+                host='yarn://yarn-mater', extra='{"queue": "root.etl", "deploy-mode": "cluster"}')
+        )
+        db.merge_conn(
+            models.Connection(
+                conn_id='spark_default_mesos', conn_type='spark',
+                host='mesos://host', port=5050)
+        )
+
+    def test_build_command(self):
+        hook = SparkSubmitHook(**self._config)
+
+        # The subprocess requires an array but we build the cmd by joining on a space
+        cmd = ' '.join(hook._build_command(self._spark_job_file))
+
+        # Check if the URL gets build properly and everything exists.
+        assert self._spark_job_file in cmd
+
+        # Check all the parameters
+        assert "--files {}".format(self._config['files']) in cmd
+        assert "--py-files {}".format(self._config['py_files']) in cmd
+        assert "--jars {}".format(self._config['jars']) in cmd
+        assert "--executor-cores {}".format(self._config['executor_cores']) in cmd
+        assert "--executor-memory {}".format(self._config['executor_memory']) in cmd
+        assert "--keytab {}".format(self._config['keytab']) in cmd
+        assert "--principal {}".format(self._config['principal']) in cmd
+        assert "--name {}".format(self._config['name']) in cmd
+        assert "--num-executors {}".format(self._config['num_executors']) in cmd
+
+        # Check if all config settings are there
+        for k in self._config['conf']:
+            assert "--conf {0}={1}".format(k, self._config['conf'][k]) in cmd
+
+        if self._config['verbose']:
+            assert "--verbose" in cmd
+
+    def test_submit(self):
+        hook = SparkSubmitHook()
+
+        # We don't have spark-submit available, and this is hard to mock, so just accept
+        # an exception for now.
+        with self.assertRaises(AirflowException):
+            hook.submit(self._spark_job_file)
+
+    def test_resolve_connection(self):
+
+        # Default to the standard yarn connection because conn_id does not exists
+        hook = SparkSubmitHook(conn_id='')
+        self.assertEqual(hook._resolve_connection(), ('yarn', None, None))
+        assert "--master yarn" in ' '.join(hook._build_command(self._spark_job_file))
+
+        # Default to the standard yarn connection
+        hook = SparkSubmitHook(conn_id='spark_default')
+        self.assertEqual(
+            hook._resolve_connection(),
+            ('yarn', 'root.default', None)
+        )
+        cmd = ' '.join(hook._build_command(self._spark_job_file))
+        assert "--master yarn" in cmd
+        assert "--queue root.default" in cmd
+
+        # Connect to a mesos master
+        hook = SparkSubmitHook(conn_id='spark_default_mesos')
+        self.assertEqual(
+            hook._resolve_connection(),
+            ('mesos://host:5050', None, None)
+        )
+
+        cmd = ' '.join(hook._build_command(self._spark_job_file))
+        assert "--master mesos://host:5050" in cmd
+
+        # Set specific queue and deploy mode
+        hook = SparkSubmitHook(conn_id='spark_yarn_cluster')
+        self.assertEqual(
+            hook._resolve_connection(),
+            ('yarn://yarn-master', 'root.etl', 'cluster')
+        )
+
+        cmd = ' '.join(hook._build_command(self._spark_job_file))
+        assert "--master yarn://yarn-master" in cmd
+        assert "--queue root.etl" in cmd
+        assert "--deploy-mode cluster" in cmd
+
+    def test_process_log(self):
+        # Must select yarn connection
+        hook = SparkSubmitHook(conn_id='spark_yarn_cluster')
+
+        log_lines = [
+            'SPARK_MAJOR_VERSION is set to 2, using Spark2',
+            'WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',
+            'WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.',
+            'INFO Client: Requesting a new application from cluster with 10 NodeManagers',
+            'INFO Client: Submitting application application_1486558679801_1820 to ResourceManager'
+        ]
+
+        hook._process_log(log_lines)
+
+        assert hook._yarn_application_id == 'application_1486558679801_1820'
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/01494fd4/tests/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/spark_submit_operator.py b/tests/contrib/operators/spark_submit_operator.py
new file mode 100644
index 0000000..c080f76
--- /dev/null
+++ b/tests/contrib/operators/spark_submit_operator.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import datetime
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+
+
+class TestSparkSubmitOperator(unittest.TestCase):
+    _config = {
+        'conf': {
+            'parquet.compression': 'SNAPPY'
+        },
+        'files': 'hive-site.xml',
+        'py_files': 'sample_library.py',
+        'jars': 'parquet.jar',
+        'executor_cores': 4,
+        'executor_memory': '22g',
+        'keytab': 'privileged_user.keytab',
+        'principal': 'user/spark@airflow.org',
+        'name': 'spark-job',
+        'num_executors': 10,
+        'verbose': True,
+        'application': 'test_application.py'
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_execute(self, conn_id='spark_default'):
+        operator = SparkSubmitOperator(
+            task_id='spark_submit_job',
+            dag=self.dag,
+            **self._config
+        )
+
+        self.assertEqual(conn_id, operator._conn_id)
+
+        self.assertEqual(self._config['application'], operator._application)
+        self.assertEqual(self._config['conf'], operator._conf)
+        self.assertEqual(self._config['files'], operator._files)
+        self.assertEqual(self._config['py_files'], operator._py_files)
+        self.assertEqual(self._config['jars'], operator._jars)
+        self.assertEqual(self._config['executor_cores'], operator._executor_cores)
+        self.assertEqual(self._config['executor_memory'], operator._executor_memory)
+        self.assertEqual(self._config['keytab'], operator._keytab)
+        self.assertEqual(self._config['principal'], operator._principal)
+        self.assertEqual(self._config['name'], operator._name)
+        self.assertEqual(self._config['num_executors'], operator._num_executors)
+        self.assertEqual(self._config['verbose'], operator._verbose)
+
+
+if __name__ == '__main__':
+    unittest.main()


[19/28] incubator-airflow git commit: [AIRFLOW-910] Use parallel task execution for backfills

Posted by bo...@apache.org.
[AIRFLOW-910] Use parallel task execution for backfills

The refactor to use dag runs in backfills caused a
regression
in task execution performance as dag runs were
executed
sequentially. Next to that, the backfills were non
deterministic
due to the random execution of tasks, causing root
tasks
being added to the non ready list too soon.

This updates the backfill logic as follows:
* Parallelize execution of tasks
* Use a leave first execution model
* Replace state updates from the executor by task
based only

Closes #2107 from bolkedebruin/AIRFLOW-910


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dcc8ede5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dcc8ede5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dcc8ede5

Branch: refs/heads/v1-8-test
Commit: dcc8ede5c1a2f6819b151dd5ce839f0a0917313a
Parents: 8ffaadf
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 09:40:38 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:33:52 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py                    | 385 +++++++++++++++++---------------
 airflow/models.py                  |  50 +++++
 tests/executor/test_executor.py    |  25 ++-
 tests/jobs.py                      |  48 ++++
 tests/models.py                    |  66 ++++++
 tests/operators/subdag_operator.py |   4 +-
 6 files changed, 393 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcc8ede5/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index fedad55..b6913f3 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -211,6 +211,28 @@ class BaseJob(Base, LoggingMixin):
     def _execute(self):
         raise NotImplementedError("This method needs to be overridden")
 
+    @provide_session
+    def reset_state_for_orphaned_tasks(self, dag_run, session=None):
+        """
+        This function checks for a DagRun if there are any tasks
+        that have a scheduled state but are not known by the
+        executor. If it finds those it will reset the state to None
+        so they will get picked up again.
+        """
+        queued_tis = self.executor.queued_tasks
+
+        # also consider running as the state might not have changed in the db yet
+        running = self.executor.running
+        tis = list()
+        tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session))
+        tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session))
+
+        for ti in tis:
+            if ti.key not in queued_tis and ti.key not in running:
+                self.logger.debug("Rescheduling orphaned task {}".format(ti))
+                ti.state = State.NONE
+        session.commit()
+
 
 class DagFileProcessor(AbstractDagFileProcessor):
     """Helps call SchedulerJob.process_file() in a separate process."""
@@ -1236,28 +1258,6 @@ class SchedulerJob(BaseJob):
 
         self.logger.info(log_str)
 
-    @provide_session
-    def _reset_state_for_orphaned_tasks(self, dag_run, session=None):
-        """
-        This function checks for a DagRun if there are any tasks
-        that have a scheduled state but are not known by the
-        executor. If it finds those it will reset the state to None
-        so they will get picked up again.
-        """
-        queued_tis = self.executor.queued_tasks
-
-        # also consider running as the state might not have changed in the db yet
-        running = self.executor.running
-        tis = list()
-        tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session))
-        tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session))
-
-        for ti in tis:
-            if ti.key not in queued_tis and ti.key not in running:
-                self.logger.debug("Rescheduling orphaned task {}".format(ti))
-                ti.state = State.NONE
-        session.commit()
-
     def _execute(self):
         self.logger.info("Starting the scheduler")
         pessimistic_connection_handling()
@@ -1361,7 +1361,7 @@ class SchedulerJob(BaseJob):
         for dr in active_runs:
             self.logger.info("Resetting {} {}".format(dr.dag_id,
                                                       dr.execution_date))
-            self._reset_state_for_orphaned_tasks(dr, session=session)
+            self.reset_state_for_orphaned_tasks(dr, session=session)
 
         session.close()
 
@@ -1663,6 +1663,68 @@ class BackfillJob(BaseJob):
         self.pool = pool
         super(BackfillJob, self).__init__(*args, **kwargs)
 
+    def _update_counters(self, started, succeeded, skipped, failed, tasks_to_run):
+        """
+        Updates the counters per state of the tasks that were running
+        :param started:
+        :param succeeded:
+        :param skipped:
+        :param failed:
+        :param tasks_to_run:
+        """
+        for key, ti in list(started.items()):
+            ti.refresh_from_db()
+            if ti.state == State.SUCCESS:
+                succeeded.add(key)
+                self.logger.debug("Task instance {} succeeded. "
+                                  "Don't rerun.".format(ti))
+                started.pop(key)
+                continue
+            elif ti.state == State.SKIPPED:
+                skipped.add(key)
+                self.logger.debug("Task instance {} skipped. "
+                                  "Don't rerun.".format(ti))
+                started.pop(key)
+                continue
+            elif ti.state == State.FAILED:
+                self.logger.error("Task instance {} failed".format(ti))
+                failed.add(key)
+                started.pop(key)
+                continue
+            # special case: if the task needs to run again put it back
+            elif ti.state == State.UP_FOR_RETRY:
+                self.logger.warning("Task instance {} is up for retry"
+                                    .format(ti))
+                started.pop(key)
+                tasks_to_run[key] = ti
+
+    def _manage_executor_state(self, started):
+        """
+        Checks if the executor agrees with the state of task instances
+        that are running
+        :param started: dict of key, task to verify
+        """
+        executor = self.executor
+
+        for key, state in list(executor.get_event_buffer().items()):
+            if key not in started:
+                self.logger.warning("{} state {} not in started={}"
+                                    .format(key, state, started.values()))
+                continue
+
+            ti = started[key]
+            ti.refresh_from_db()
+
+            self.logger.debug("Executor state: {} task {}".format(state, ti))
+
+            if state == State.FAILED or state == State.SUCCESS:
+                if ti.state == State.RUNNING or ti.state == State.QUEUED:
+                    msg = ("Executor reports task instance {} finished ({}) "
+                           "although the task says its {}. Was the task "
+                           "killed externally?".format(ti, state, ti.state))
+                    self.logger.error(msg)
+                    ti.handle_failure(msg)
+
     def _execute(self):
         """
         Runs a dag for a specified date range.
@@ -1700,13 +1762,12 @@ class BackfillJob(BaseJob):
 
         executor = self.executor
         executor.start()
-        executor_fails = Counter()
 
         # Build a list of all instances to run
         tasks_to_run = {}
         failed = set()
         succeeded = set()
-        started = set()
+        started = {}
         skipped = set()
         not_ready = set()
         deadlocked = set()
@@ -1744,33 +1805,40 @@ class BackfillJob(BaseJob):
             run.state = State.RUNNING
             run.verify_integrity(session=session)
 
+            # check if we have orphaned tasks
+            self.reset_state_for_orphaned_tasks(dag_run=run, session=session)
+
             # for some reason if we dont refresh the reference to run is lost
             run.refresh_from_db()
             make_transient(run)
             active_dag_runs.append(run)
 
+            for ti in run.get_task_instances():
+                # all tasks part of the backfill are scheduled to run
+                ti.set_state(State.SCHEDULED, session=session)
+                tasks_to_run[ti.key] = ti
+
             next_run_date = self.dag.following_schedule(next_run_date)
 
-        run_count = 0
-        for run in active_dag_runs:
-            logging.info("Checking run {}".format(run))
-            run_count = run_count + 1
-
-            def get_task_instances_for_dag_run(dag_run):
-                # this needs a fresh session sometimes tis get detached
-                # can be more finegrained (excluding success or skipped)
-                tasks = {}
-                for ti in dag_run.get_task_instances():
-                    tasks[ti.key] = ti
-                return tasks
-
-            # Triggering what is ready to get triggered
-            while not deadlocked:
-                tasks_to_run = get_task_instances_for_dag_run(run)
-                self.logger.debug("Clearing out not_ready list")
-                not_ready.clear()
+        finished_runs = 0
+        total_runs = len(active_dag_runs)
+
+        # Triggering what is ready to get triggered
+        while (len(tasks_to_run) > 0 or len(started) > 0) and not deadlocked:
+            self.logger.debug("*** Clearing out not_ready list ***")
+            not_ready.clear()
 
+            # we need to execute the tasks bottom to top
+            # or leaf to root, as otherwise tasks might be
+            # determined deadlocked while they are actually
+            # waiting for their upstream to finish
+            for task in self.dag.topological_sort():
                 for key, ti in list(tasks_to_run.items()):
+                    if task.task_id != ti.task_id:
+                        continue
+
+                    ti.refresh_from_db()
+
                     task = self.dag.get_task(ti.task_id)
                     ti.task = task
 
@@ -1779,6 +1847,7 @@ class BackfillJob(BaseJob):
                         ti.execution_date == (start_date or ti.start_date))
                     self.logger.debug("Task instance to run {} state {}"
                                       .format(ti, ti.state))
+
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
@@ -1786,178 +1855,130 @@ class BackfillJob(BaseJob):
                         self.logger.debug("Task instance {} succeeded. "
                                           "Don't rerun.".format(ti))
                         tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
                         continue
                     elif ti.state == State.SKIPPED:
                         skipped.add(key)
                         self.logger.debug("Task instance {} skipped. "
                                           "Don't rerun.".format(ti))
                         tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
                         continue
                     elif ti.state == State.FAILED:
                         self.logger.error("Task instance {} failed".format(ti))
                         failed.add(key)
                         tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
+                        continue
+                    elif ti.state == State.UPSTREAM_FAILED:
+                        self.logger.error("Task instance {} upstream failed".format(ti))
+                        failed.add(key)
+                        tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
                         continue
-
                     backfill_context = DepContext(
                         deps=RUN_DEPS,
                         ignore_depends_on_past=ignore_depends_on_past,
                         ignore_task_deps=self.ignore_task_deps,
                         flag_upstream_failed=True)
+
                     # Is the task runnable? -- then run it
+                    # the dependency checker can change states of tis
                     if ti.are_dependencies_met(
                             dep_context=backfill_context,
                             session=session,
                             verbose=True):
-                        self.logger.debug('Sending {} to executor'.format(ti))
-                        if ti.state == State.NONE:
-                            ti.state = State.SCHEDULED
+                        ti.refresh_from_db(lock_for_update=True, session=session)
+                        if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY:
+                            # Skip scheduled state, we are executing immediately
+                            ti.state = State.QUEUED
                             session.merge(ti)
+                            self.logger.debug('Sending {} to executor'.format(ti))
+                            executor.queue_task_instance(
+                                ti,
+                                mark_success=self.mark_success,
+                                pickle_id=pickle_id,
+                                ignore_task_deps=self.ignore_task_deps,
+                                ignore_depends_on_past=ignore_depends_on_past,
+                                pool=self.pool)
+                            started[key] = ti
+                            tasks_to_run.pop(key)
                         session.commit()
-                        executor.queue_task_instance(
-                            ti,
-                            mark_success=self.mark_success,
-                            pickle_id=pickle_id,
-                            ignore_task_deps=self.ignore_task_deps,
-                            ignore_depends_on_past=ignore_depends_on_past,
-                            pool=self.pool)
-                        started.add(key)
-
-                    # Mark the task as not ready to run
-                    elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
-                        self.logger.debug('Adding {} to not_ready'.format(ti))
-                        not_ready.add(key)
-
-                    session.commit()
-
-                self.heartbeat()
-                executor.heartbeat()
-
-                # If the set of tasks that aren't ready ever equals the set of
-                # tasks to run, then the backfill is deadlocked
-                if not_ready and not_ready == set(tasks_to_run):
-                    self.logger.warn("Deadlock discovered for tasks_to_run={}"
-                                     .format(tasks_to_run.values()))
-                    deadlocked.update(tasks_to_run.values())
-                    tasks_to_run.clear()
-
-                # Reacting to events
-                for key, state in list(executor.get_event_buffer().items()):
-                    if key not in tasks_to_run:
-                        self.logger.warn("{} state {} not in tasks_to_run={}"
-                                         .format(key, state,
-                                                 tasks_to_run.values()))
                         continue
-                    ti = tasks_to_run[key]
-                    ti.refresh_from_db()
-                    logging.info("Executor state: {} task {}".format(state, ti))
-                    # executor reports failure
-                    if state == State.FAILED:
-
-                        # task reports running
-                        if ti.state == State.RUNNING:
-                            msg = (
-                                'Executor reports that task instance {} failed '
-                                'although the task says it is running.'.format(ti))
-                            self.logger.error(msg)
-                            ti.handle_failure(msg)
-                            tasks_to_run.pop(key)
 
-                        # task reports skipped
-                        elif ti.state == State.SKIPPED:
-                            self.logger.error("Skipping {} ".format(ti))
-                            skipped.add(key)
-                            tasks_to_run.pop(key)
+                    if ti.state == State.UPSTREAM_FAILED:
+                        self.logger.error("Task instance {} upstream failed".format(ti))
+                        failed.add(key)
+                        tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
+                        continue
 
-                        # anything else is a failure
-                        else:
-                            self.logger.error("Task instance {} failed".format(ti))
-                            failed.add(key)
-                            tasks_to_run.pop(key)
+                    # all remaining tasks
+                    self.logger.debug('Adding {} to not_ready'.format(ti))
+                    not_ready.add(key)
 
-                    # executor reports success
-                    elif state == State.SUCCESS:
+            # execute the tasks in the queue
+            self.heartbeat()
+            executor.heartbeat()
 
-                        # task reports success
-                        if ti.state == State.SUCCESS:
-                            self.logger.info(
-                                'Task instance {} succeeded'.format(ti))
-                            succeeded.add(key)
-                            tasks_to_run.pop(key)
+            # If the set of tasks that aren't ready ever equals the set of
+            # tasks to run and there are no running tasks then the backfill
+            # is deadlocked
+            if not_ready and not_ready == set(tasks_to_run) and len(started) == 0:
+                self.logger.warning("Deadlock discovered for tasks_to_run={}"
+                                    .format(tasks_to_run.values()))
+                deadlocked.update(tasks_to_run.values())
+                tasks_to_run.clear()
 
-                        # task reports failure
-                        elif ti.state == State.FAILED:
-                            self.logger.error("Task instance {} failed".format(ti))
-                            failed.add(key)
-                            tasks_to_run.pop(key)
+            # check executor state
+            self._manage_executor_state(started)
 
-                        # task reports skipped
-                        elif ti.state == State.SKIPPED:
-                            self.logger.info("Task instance {} skipped".format(ti))
-                            skipped.add(key)
-                            tasks_to_run.pop(key)
-
-                        # this probably won't ever be triggered
-                        elif ti in not_ready:
-                            self.logger.info(
-                                "{} wasn't expected to run, but it did".format(ti))
-
-                        # executor reports success but task does not - this is weird
-                        elif ti.state not in (
-                                State.SCHEDULED,
-                                State.QUEUED,
-                                State.UP_FOR_RETRY):
-                            self.logger.error(
-                                "The airflow run command failed "
-                                "at reporting an error. This should not occur "
-                                "in normal circumstances. Task state is '{}',"
-                                "reported state is '{}'. TI is {}"
-                                "".format(ti.state, state, ti))
-
-                            # if the executor fails 3 or more times, stop trying to
-                            # run the task
-                            executor_fails[key] += 1
-                            if executor_fails[key] >= 3:
-                                msg = (
-                                    'The airflow run command failed to report an '
-                                    'error for task {} three or more times. The '
-                                    'task is being marked as failed. This is very '
-                                    'unusual and probably means that an error is '
-                                    'taking place before the task even '
-                                    'starts.'.format(key))
-                                self.logger.error(msg)
-                                ti.handle_failure(msg)
-                                tasks_to_run.pop(key)
-                msg = ' | '.join([
-                    "[backfill progress]",
-                    "dag run {6} of {7}",
-                    "tasks waiting: {0}",
-                    "succeeded: {1}",
-                    "kicked_off: {2}",
-                    "failed: {3}",
-                    "skipped: {4}",
-                    "deadlocked: {5}"
-                ]).format(
-                    len(tasks_to_run),
-                    len(succeeded),
-                    len(started),
-                    len(failed),
-                    len(skipped),
-                    len(deadlocked),
-                    run_count,
-                    len(active_dag_runs))
-                self.logger.info(msg)
-
-                self.logger.debug("Finished dag run loop iteration. "
-                                  "Remaining tasks {}"
-                                  .format(tasks_to_run.values()))
-                if len(tasks_to_run) == 0:
-                    break
+            # update the task counters
+            self._update_counters(started=started, succeeded=succeeded,
+                                  skipped=skipped, failed=failed,
+                                  tasks_to_run=tasks_to_run)
 
             # update dag run state
-            run.update_state(session=session)
-            if run.dag.is_paused:
-                models.DagStat.clean_dirty([run.dag_id], session=session)
+            _dag_runs = active_dag_runs[:]
+            for run in _dag_runs:
+                run.update_state(session=session)
+                if run.state in State.finished():
+                    finished_runs += 1
+                    active_dag_runs.remove(run)
+
+                if run.dag.is_paused:
+                    models.DagStat.clean_dirty([run.dag_id], session=session)
+
+            msg = ' | '.join([
+                "[backfill progress]",
+                "finished run {0} of {1}",
+                "tasks waiting: {2}",
+                "succeeded: {3}",
+                "kicked_off: {4}",
+                "failed: {5}",
+                "skipped: {6}",
+                "deadlocked: {7}",
+                "not ready: {8}"
+            ]).format(
+                finished_runs,
+                total_runs,
+                len(tasks_to_run),
+                len(succeeded),
+                len(started),
+                len(failed),
+                len(skipped),
+                len(deadlocked),
+                len(not_ready))
+            self.logger.info(msg)
+
+            self.logger.debug("Finished dag run loop iteration. "
+                              "Remaining tasks {}"
+                              .format(tasks_to_run.values()))
 
         executor.end()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcc8ede5/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 3fef407..e63da3e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3018,6 +3018,56 @@ class DAG(BaseDag, LoggingMixin):
     def roots(self):
         return [t for t in self.tasks if not t.downstream_list]
 
+    def topological_sort(self):
+        """
+        Sorts tasks in topographical order, such that a task comes after any of its
+        upstream dependencies.
+
+        Heavily inspired by:
+        http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/
+        :returns: list of tasks in topological order
+        """
+
+        # copy the the tasks so we leave it unmodified
+        graph_unsorted = self.tasks[:]
+
+        graph_sorted = []
+
+        # special case
+        if len(self.tasks) == 0:
+            return tuple(graph_sorted)
+
+        # Run until the unsorted graph is empty.
+        while graph_unsorted:
+            # Go through each of the node/edges pairs in the unsorted
+            # graph. If a set of edges doesn't contain any nodes that
+            # haven't been resolved, that is, that are still in the
+            # unsorted graph, remove the pair from the unsorted graph,
+            # and append it to the sorted graph. Note here that by using
+            # using the items() method for iterating, a copy of the
+            # unsorted graph is used, allowing us to modify the unsorted
+            # graph as we move through it. We also keep a flag for
+            # checking that that graph is acyclic, which is true if any
+            # nodes are resolved during each pass through the graph. If
+            # not, we need to bail out as the graph therefore can't be
+            # sorted.
+            acyclic = False
+            for node in list(graph_unsorted):
+                for edge in node.upstream_list:
+                    if edge in graph_unsorted:
+                        break
+                # no edges in upstream tasks
+                else:
+                    acyclic = True
+                    graph_unsorted.remove(node)
+                    graph_sorted.append(node)
+
+            if not acyclic:
+                raise AirflowException("A cyclic dependency occurred in dag: {}"
+                                       .format(self.dag_id))
+
+        return tuple(graph_sorted)
+
     @provide_session
     def set_dag_runs_state(
             self, state=State.RUNNING, session=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcc8ede5/tests/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executor/test_executor.py b/tests/executor/test_executor.py
index 2015d9c..9ec6cd4 100644
--- a/tests/executor/test_executor.py
+++ b/tests/executor/test_executor.py
@@ -12,18 +12,41 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from airflow.executors.base_executor import BaseExecutor
+from airflow.utils.state import State
+
+from airflow import settings
 
 
 class TestExecutor(BaseExecutor):
     """
     TestExecutor is used for unit testing purposes.
     """
+    def __init__(self, do_update=False, *args, **kwargs):
+        self.do_update = do_update
+        self._running = []
+        self.history = []
+
+        super(TestExecutor, self).__init__(*args, **kwargs)
+
     def execute_async(self, key, command, queue=None):
         self.logger.debug("{} running task instances".format(len(self.running)))
         self.logger.debug("{} in queue".format(len(self.queued_tasks)))
 
     def heartbeat(self):
-        pass
+        session = settings.Session()
+        if self.do_update:
+            self.history.append(list(self.queued_tasks.values()))
+            while len(self._running) > 0:
+                ti = self._running.pop()
+                ti.set_state(State.SUCCESS, session)
+            for key, val in list(self.queued_tasks.items()):
+                (command, priority, queue, ti) = val
+                ti.set_state(State.RUNNING, session)
+                self._running.append(ti)
+                self.queued_tasks.pop(key)
+
+        session.commit()
+        session.close()
 
     def terminate(self):
         pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcc8ede5/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 1f7950e..1acf269 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -162,6 +162,54 @@ class BackfillJobTest(unittest.TestCase):
                 ignore_first_depends_on_past=True)
             job.run()
 
+    def test_backfill_ordered_concurrent_execute(self):
+        dag = DAG(
+            dag_id='test_backfill_ordered_concurrent_execute',
+            start_date=DEFAULT_DATE,
+            schedule_interval="@daily")
+
+        with dag:
+            op1 = DummyOperator(task_id='leave1')
+            op2 = DummyOperator(task_id='leave2')
+            op3 = DummyOperator(task_id='upstream_level_1')
+            op4 = DummyOperator(task_id='upstream_level_2')
+            op5 = DummyOperator(task_id='upstream_level_3')
+            # order randomly
+            op2.set_downstream(op3)
+            op1.set_downstream(op3)
+            op4.set_downstream(op5)
+            op3.set_downstream(op4)
+
+        dag.clear()
+
+        executor = TestExecutor(do_update=True)
+        job = BackfillJob(dag=dag,
+                          executor=executor,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+                          )
+        job.run()
+
+        # test executor history keeps a list
+        history = executor.history
+
+        # check if right order. Every loop has a 'pause' (0) to change state
+        # from RUNNING to SUCCESS.
+        # 6,0,3,0,3,0,3,0 = 8 loops
+        self.assertEqual(8, len(history))
+
+        loop_count = 0
+
+        while len(history) > 0:
+            queued_tasks = history.pop(0)
+            if loop_count == 0:
+                # first loop should contain 6 tasks (3 days x 2 tasks)
+                self.assertEqual(6, len(queued_tasks))
+            if loop_count == 2 or loop_count == 4 or loop_count == 6:
+                # 3 days x 1 task
+                self.assertEqual(3, len(queued_tasks))
+            loop_count += 1
+
     def test_backfill_pooled_tasks(self):
         """
         Test that queued tasks are executed by BackfillJob

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcc8ede5/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index d904ff3..55117d4 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -18,6 +18,7 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import datetime
+import logging
 import os
 import unittest
 import time
@@ -118,6 +119,71 @@ class DagTest(unittest.TestCase):
         self.assertEqual(dag.dag_id, 'creating_dag_in_cm')
         self.assertEqual(dag.tasks[0].task_id, 'op6')
 
+    def test_dag_topological_sort(self):
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B
+        # A -> C -> D
+        # ordered: B, D, C, A or D, B, C, A or D, C, B, A
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op3 = DummyOperator(task_id='C')
+            op4 = DummyOperator(task_id='D')
+            op1.set_upstream([op2, op3])
+            op3.set_upstream(op4)
+
+        topological_list = dag.topological_sort()
+        logging.info(topological_list)
+
+        tasks = [op2, op3, op4]
+        self.assertTrue(topological_list[0] in tasks)
+        tasks.remove(topological_list[0])
+        self.assertTrue(topological_list[1] in tasks)
+        tasks.remove(topological_list[1])
+        self.assertTrue(topological_list[2] in tasks)
+        tasks.remove(topological_list[2])
+        self.assertTrue(topological_list[3] == op1)
+
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # C -> (A u B) -> D
+        # C -> E
+        # ordered: E | D, A | B, C
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op3 = DummyOperator(task_id='C')
+            op4 = DummyOperator(task_id='D')
+            op5 = DummyOperator(task_id='E')
+            op1.set_downstream(op3)
+            op2.set_downstream(op3)
+            op1.set_upstream(op4)
+            op2.set_upstream(op4)
+            op5.set_downstream(op3)
+
+        topological_list = dag.topological_sort()
+        logging.info(topological_list)
+
+        self.assertTrue(topological_list[0] == op5 or topological_list[0] == op4)
+        self.assertTrue(topological_list[1] == op4 or topological_list[1] == op5)
+        self.assertTrue(topological_list[2] == op1 or topological_list[2] == op2)
+        self.assertTrue(topological_list[3] == op1 or topological_list[3] == op2)
+        self.assertTrue(topological_list[4] == op3)
+
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        self.assertEquals(tuple(), dag.topological_sort())
+
 
 class DagRunTest(unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcc8ede5/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index 6a25ac3..6f6847c 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -91,8 +91,8 @@ class SubDagOperatorTests(unittest.TestCase):
         subdag = dagbag.get_dag('test_subdag_deadlock.subdag')
         subdag.clear()
 
-        # first make sure subdag is deadlocked
-        self.assertRaisesRegexp(AirflowException, 'deadlocked', subdag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        # first make sure subdag has failed
+        self.assertRaises(AirflowException, subdag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
         # now make sure dag picks up the subdag error
         self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)


[11/28] incubator-airflow git commit: [AIRFLOW-919] Running tasks with no start date shouldn't break a DAGs UI

Posted by bo...@apache.org.
[AIRFLOW-919] Running tasks with no start date shouldn't break a DAGs UI

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-919

I also made the airflow PR template a little bit
less verbose (requires less edits when creating a
PR).

Testing Done:
- Ran a webserver with this case and made sure
that the DAG page loaded

Closes #2110 from
aoen/ddavydov/fix_running_task_with_no_start_date


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ab37f8d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ab37f8d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ab37f8d3

Branch: refs/heads/v1-8-test
Commit: ab37f8d32ef9dcf3163a037b53ca749f2f99f22e
Parents: 01494fd
Author: Dan Davydov <da...@airbnb.com>
Authored: Mon Feb 27 13:43:25 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:20:00 2017 -0700

----------------------------------------------------------------------
 .github/PULL_REQUEST_TEMPLATE.md | 6 +-----
 airflow/www/views.py             | 3 ++-
 2 files changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab37f8d3/.github/PULL_REQUEST_TEMPLATE.md
----------------------------------------------------------------------
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index 5681a89..b92e29a 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -1,9 +1,5 @@
-Dear Airflow Maintainers,
-
 Please accept this PR that addresses the following issues:
-- *(replace with a link to AIRFLOW-X)*
-
-Per Apache guidelines you need to create a [Jira issue](https://issues.apache.org/jira/browse/AIRFLOW/).
+- *(MANDATORY - replace with a link to JIRA - e.g. https://issues.apache.org/jira/browse/AIRFLOW-XXX)*
 
 Testing Done:
 - Unittests are required, if you do not include new unit tests please

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab37f8d3/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index bda4921..86b1291 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1205,7 +1205,8 @@ class Airflow(BaseView):
                 children_key = "_children"
 
             def set_duration(tid):
-                if isinstance(tid, dict) and tid.get("state") == State.RUNNING:
+                if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and
+                        tid["start_date"] is not None):
                     d = datetime.now() - dateutil.parser.parse(tid["start_date"])
                     tid["duration"] = d.total_seconds()
                 return tid


[02/28] incubator-airflow git commit: [AIRFLOW-694] Fix config behaviour for empty envvar

Posted by bo...@apache.org.
[AIRFLOW-694] Fix config behaviour for empty envvar

Currently, environment variable with empty value
does not overwrite the
configuration value corresponding to it.

Closes #2044 from sekikn/AIRFLOW-694


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5405f5f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5405f5f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5405f5f8

Branch: refs/heads/v1-8-test
Commit: 5405f5f83c6e20fff2dc209cd4be3d1d5ea85140
Parents: a7abcf3
Author: Kengo Seki <se...@nttdata.co.jp>
Authored: Thu Feb 2 14:38:29 2017 +0100
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 07:56:15 2017 -0700

----------------------------------------------------------------------
 airflow/configuration.py |  2 +-
 tests/core.py            | 24 ++++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5405f5f8/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 011f764..404808b 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -591,7 +591,7 @@ class AirflowConfigParser(ConfigParser):
 
         # first check environment variables
         option = self._get_env_var_option(section, key)
-        if option:
+        if option is not None:
             return option
 
         # ...then the config file

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5405f5f8/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index e35809d..3e76e81 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -776,6 +776,30 @@ class CoreTest(unittest.TestCase):
         configuration.set("core", "FERNET_KEY", FERNET_KEY)
         assert configuration.has_option("core", "FERNET_KEY")
 
+    def test_config_override_original_when_non_empty_envvar_is_provided(self):
+        key = "AIRFLOW__CORE__FERNET_KEY"
+        value = "some value"
+        assert key not in os.environ
+
+        os.environ[key] = value
+        FERNET_KEY = configuration.get('core', 'FERNET_KEY')
+        assert FERNET_KEY == value
+
+        # restore the envvar back to the original state
+        del os.environ[key]
+
+    def test_config_override_original_when_empty_envvar_is_provided(self):
+        key = "AIRFLOW__CORE__FERNET_KEY"
+        value = ""
+        assert key not in os.environ
+
+        os.environ[key] = value
+        FERNET_KEY = configuration.get('core', 'FERNET_KEY')
+        assert FERNET_KEY == value
+
+        # restore the envvar back to the original state
+        del os.environ[key]
+
     def test_class_with_logger_should_have_logger_with_correct_name(self):
 
         # each class should automatically receive a logger with a correct name


[12/28] incubator-airflow git commit: [AIRFLOW-925] Revert airflow.hooks change that cherry-pick picked

Posted by bo...@apache.org.
[AIRFLOW-925] Revert airflow.hooks change that cherry-pick picked

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-925

Testing Done:
- Fixes bug in prod

Closes #2112 from saguziel/aguziel-
hivemetastorehook-import-apache


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f04ea97d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f04ea97d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f04ea97d

Branch: refs/heads/v1-8-test
Commit: f04ea97d066093abf898fec81f96eeb4b82eaf13
Parents: ab37f8d
Author: Li Xuanji <xu...@airbnb.com>
Authored: Tue Feb 28 12:17:33 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:20:19 2017 -0700

----------------------------------------------------------------------
 airflow/operators/sensors.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f04ea97d/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 5fbd21c..c0aba27 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -300,7 +300,7 @@ class NamedHivePartitionSensor(BaseSensorOperator):
     def poke(self, context):
 
         if not hasattr(self, 'hook'):
-            self.hook = airflow.hooks.hive_hooks.HiveMetastoreHook(
+            self.hook = hooks.HiveMetastoreHook(
                 metastore_conn_id=self.metastore_conn_id)
 
         def poke_partition(partition):
@@ -369,7 +369,7 @@ class HivePartitionSensor(BaseSensorOperator):
             'Poking for table {self.schema}.{self.table}, '
             'partition {self.partition}'.format(**locals()))
         if not hasattr(self, 'hook'):
-            self.hook = airflow.hooks.hive_hooks.HiveMetastoreHook(
+            self.hook = hooks.HiveMetastoreHook(
                 metastore_conn_id=self.metastore_conn_id)
         return self.hook.check_for_partition(
             self.schema, self.table, self.partition)


[17/28] incubator-airflow git commit: [AIRFLOW-941] Use defined parameters for psycopg2

Posted by bo...@apache.org.
[AIRFLOW-941] Use defined parameters for psycopg2

This works around
https://github.com/psycopg/psycopg2/issues/517 .

Closes #2126 from bolkedebruin/AIRFLOW-941


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1f3aead5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1f3aead5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1f3aead5

Branch: refs/heads/v1-8-test
Commit: 1f3aead5c486c3576a5df3b6904aa449b8a1d90a
Parents: 4077c6d
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon Mar 6 21:03:14 2017 +0100
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:28:48 2017 -0700

----------------------------------------------------------------------
 airflow/hooks/postgres_hook.py | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1f3aead5/airflow/hooks/postgres_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py
index 75c8226..750ebbb 100644
--- a/airflow/hooks/postgres_hook.py
+++ b/airflow/hooks/postgres_hook.py
@@ -32,10 +32,17 @@ class PostgresHook(DbApiHook):
         conn = self.get_connection(self.postgres_conn_id)
         conn_args = dict(
             host=conn.host,
-            user=conn.login,
-            password=conn.password,
-            dbname=conn.schema,
-            port=conn.port)
+            dbname=self.schema or conn.schema)
+        # work around for https://github.com/psycopg/psycopg2/issues/517
+        # todo: remove when psycopg2 2.7.1 is released
+        # https://issues.apache.org/jira/browse/AIRFLOW-945
+        if conn.port:
+            conn_args['port'] = conn.port
+        if conn.login:
+            conn_args['user'] = conn.login
+        if conn.password:
+            conn_args['password'] = conn.password
+
         # check for ssl parameters in conn.extra
         for arg_name, arg_val in conn.extra_dejson.items():
             if arg_name in ['sslmode', 'sslcert', 'sslkey', 'sslrootcert', 'sslcrl', 'application_name']:


[26/28] incubator-airflow git commit: Remove remnants

Posted by bo...@apache.org.
Remove remnants


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3927e00d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3927e00d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3927e00d

Branch: refs/heads/v1-8-test
Commit: 3927e00dc72f6f2d14e463ff8daba3e3bcb11b73
Parents: 8df046b
Author: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Authored: Sun Mar 12 10:33:49 2017 -0700
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 10:33:49 2017 -0700

----------------------------------------------------------------------
 tests/executor/__init__.py      | 13 ---------
 tests/executor/test_executor.py | 56 ------------------------------------
 2 files changed, 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3927e00d/tests/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/executor/__init__.py b/tests/executor/__init__.py
deleted file mode 100644
index a85b772..0000000
--- a/tests/executor/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3927e00d/tests/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executor/test_executor.py b/tests/executor/test_executor.py
deleted file mode 100644
index 9ec6cd4..0000000
--- a/tests/executor/test_executor.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.state import State
-
-from airflow import settings
-
-
-class TestExecutor(BaseExecutor):
-    """
-    TestExecutor is used for unit testing purposes.
-    """
-    def __init__(self, do_update=False, *args, **kwargs):
-        self.do_update = do_update
-        self._running = []
-        self.history = []
-
-        super(TestExecutor, self).__init__(*args, **kwargs)
-
-    def execute_async(self, key, command, queue=None):
-        self.logger.debug("{} running task instances".format(len(self.running)))
-        self.logger.debug("{} in queue".format(len(self.queued_tasks)))
-
-    def heartbeat(self):
-        session = settings.Session()
-        if self.do_update:
-            self.history.append(list(self.queued_tasks.values()))
-            while len(self._running) > 0:
-                ti = self._running.pop()
-                ti.set_state(State.SUCCESS, session)
-            for key, val in list(self.queued_tasks.items()):
-                (command, priority, queue, ti) = val
-                ti.set_state(State.RUNNING, session)
-                self._running.append(ti)
-                self.queued_tasks.pop(key)
-
-        session.commit()
-        session.close()
-
-    def terminate(self):
-        pass
-
-    def end(self):
-        self.sync()
-


[27/28] incubator-airflow git commit: Fix postgres hook

Posted by bo...@apache.org.
Fix postgres hook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f171d17e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f171d17e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f171d17e

Branch: refs/heads/v1-8-test
Commit: f171d17e8b5ef698f487bed8a40c6dd21ed81b51
Parents: 3927e00
Author: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Authored: Sun Mar 12 10:34:19 2017 -0700
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 10:34:19 2017 -0700

----------------------------------------------------------------------
 airflow/hooks/postgres_hook.py | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f171d17e/airflow/hooks/postgres_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py
index 750ebbb..584930d 100644
--- a/airflow/hooks/postgres_hook.py
+++ b/airflow/hooks/postgres_hook.py
@@ -28,6 +28,10 @@ class PostgresHook(DbApiHook):
     default_conn_name = 'postgres_default'
     supports_autocommit = True
 
+    def __init__(self, *args, **kwargs):
+        super(PostgresHook, self).__init__(*args, **kwargs)
+        self.schema = kwargs.pop("schema", None)
+
     def get_conn(self):
         conn = self.get_connection(self.postgres_conn_id)
         conn_args = dict(


[13/28] incubator-airflow git commit: [AIRFLOW-933] use ast.literal_eval rather eval because ast.literal_eval does not execute input.

Posted by bo...@apache.org.
[AIRFLOW-933] use ast.literal_eval rather eval because ast.literal_eval does not execute
input.

This PR addresses the following issues:
- *(https://issues.apache.org/jira/browse/AIRFLOW-
933)*

This PR is trying to solve a secure issue. The
test was done by setting up a local web server and
reproduce the issue described in JIRA link above.

Closes #2117 from amaliujia/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0964f189
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0964f189
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0964f189

Branch: refs/heads/v1-8-test
Commit: 0964f189f2cd2ac10150040670a542910370e456
Parents: f04ea97
Author: Rui Wang <ru...@airbnb.com>
Authored: Wed Mar 1 14:03:34 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:21:01 2017 -0700

----------------------------------------------------------------------
 airflow/www/views.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0964f189/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 86b1291..d8acfef 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -44,6 +44,7 @@ from flask._compat import PY2
 import jinja2
 import markdown
 import nvd3
+import ast
 
 from wtforms import (
     Form, SelectField, TextAreaField, PasswordField, StringField, validators)
@@ -168,7 +169,7 @@ def nobr_f(v, c, m, p):
 
 def label_link(v, c, m, p):
     try:
-        default_params = eval(m.default_params)
+        default_params = ast.literal_eval(m.default_params)
     except:
         default_params = {}
     url = url_for(


[09/28] incubator-airflow git commit: [AIRFLOW-897] Prevent dagruns from failing with unfinished tasks

Posted by bo...@apache.org.
[AIRFLOW-897] Prevent dagruns from failing with unfinished tasks

Closes #2099 from
aoen/ddavydov/fix_premature_dagrun_failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c29af466
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c29af466
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c29af466

Branch: refs/heads/v1-8-test
Commit: c29af4668a67b5d7f969140549558714fb7b32c9
Parents: ff0fa00
Author: Dan Davydov <da...@airbnb.com>
Authored: Fri Feb 24 14:29:11 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:17:40 2017 -0700

----------------------------------------------------------------------
 airflow/models.py             |  6 +++---
 tests/dags/test_issue_1225.py | 13 +++++++++++++
 tests/jobs.py                 | 24 ++++++++++++++++++++++++
 3 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 1829ff3..3fef407 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3993,12 +3993,12 @@ class DagRun(Base):
 
         # future: remove the check on adhoc tasks (=active_tasks)
         if len(tis) == len(dag.active_tasks):
-            # if any roots failed, the run failed
             root_ids = [t.task_id for t in dag.roots]
             roots = [t for t in tis if t.task_id in root_ids]
 
-            if any(r.state in (State.FAILED, State.UPSTREAM_FAILED)
-                   for r in roots):
+            # if all roots finished and at least on failed, the run failed
+            if (not unfinished_tasks and
+                    any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
                 logging.info('Marking run {} failed'.format(self))
                 self.state = State.FAILED
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index 021561f..d01fd79 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -129,3 +129,16 @@ dag7_subdag1 = SubDagOperator(
     subdag=subdag7)
 subdag7_task1.set_downstream(subdag7_task2)
 subdag7_task2.set_downstream(subdag7_task3)
+
+# DAG tests that a Dag run that doesn't complete but has a root failure is marked running
+dag8 = DAG(dag_id='test_dagrun_states_root_fail_unfinished', default_args=default_args)
+dag8_task1 = DummyOperator(
+    task_id='test_dagrun_unfinished',  # The test will unset the task instance state after
+                                       # running this test
+    dag=dag8,
+)
+dag8_task2 = PythonOperator(
+    task_id='test_dagrun_fail',
+    dag=dag8,
+    python_callable=fail,
+)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e520b44..1f7950e 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -358,6 +358,30 @@ class SchedulerJobTest(unittest.TestCase):
             },
             dagrun_state=State.FAILED)
 
+    def test_dagrun_root_fail_unfinished(self):
+        """
+        DagRuns with one unfinished and one failed root task -> RUNNING
+        """
+        # Run both the failed and successful tasks
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        dag_id = 'test_dagrun_states_root_fail_unfinished'
+        dag = self.dagbag.get_dag(dag_id)
+        dag.clear()
+        dr = scheduler.create_dag_run(dag)
+        try:
+            dag.run(start_date=dr.execution_date, end_date=dr.execution_date)
+        except AirflowException:  # Expect an exception since there is a failed task
+            pass
+
+        # Mark the successful task as never having run since we want to see if the
+        # dagrun will be in a running state despite haveing an unfinished task.
+        session = settings.Session()
+        ti = dr.get_task_instance('test_dagrun_unfinished', session=session)
+        ti.state = State.NONE
+        session.commit()
+        dr_state = dr.update_state()
+        self.assertEqual(dr_state, State.RUNNING)
+
     def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):
         """
         DagRun is marked a success if ignore_first_depends_on_past=True


[23/28] incubator-airflow git commit: Fix tests for topological sort

Posted by bo...@apache.org.
Fix tests for topological sort


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/57faa530
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/57faa530
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/57faa530

Branch: refs/heads/v1-8-test
Commit: 57faa530f7e9580cda9bb0200d40af15d323df24
Parents: 1243ab1
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 13:26:39 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:34:53 2017 -0700

----------------------------------------------------------------------
 tests/models.py | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/57faa530/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 55117d4..ffd1f31 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -171,10 +171,20 @@ class DagTest(unittest.TestCase):
         topological_list = dag.topological_sort()
         logging.info(topological_list)
 
-        self.assertTrue(topological_list[0] == op5 or topological_list[0] == op4)
-        self.assertTrue(topological_list[1] == op4 or topological_list[1] == op5)
-        self.assertTrue(topological_list[2] == op1 or topological_list[2] == op2)
-        self.assertTrue(topological_list[3] == op1 or topological_list[3] == op2)
+        set1 = [op4, op5]
+        self.assertTrue(topological_list[0] in set1)
+        set1.remove(topological_list[0])
+
+        set2 = [op1, op2]
+        set2.extend(set1)
+        self.assertTrue(topological_list[1] in set2)
+        set2.remove(topological_list[1])
+
+        self.assertTrue(topological_list[2] in set2)
+        set2.remove(topological_list[2])
+
+        self.assertTrue(topological_list[3] in set2)
+
         self.assertTrue(topological_list[4] == op3)
 
         dag = DAG(


[24/28] incubator-airflow git commit: [AIRFLOW-900] Double trigger should not kill original task instance

Posted by bo...@apache.org.
[AIRFLOW-900] Double trigger should not kill original task instance

This update the tests of an earlier AIRFLOW-900.

Closes #2146 from bolkedebruin/AIRFLOW-900


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b26a5d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b26a5d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b26a5d9

Branch: refs/heads/v1-8-test
Commit: 2b26a5d95ce230b66255c8e7e7388c8013dc6ba6
Parents: 57faa53
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 13:42:58 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:36:07 2017 -0700

----------------------------------------------------------------------
 tests/core.py                     | 58 -----------------------
 tests/dags/sleep_forever_dag.py   | 29 ------------
 tests/dags/test_double_trigger.py | 29 ++++++++++++
 tests/jobs.py                     | 86 ++++++++++++++++++++++++++++++++--
 4 files changed, 112 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 636ad43..870a0cb 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -896,64 +896,6 @@ class CoreTest(unittest.TestCase):
                 trigger_rule="non_existant",
                 dag=self.dag)
 
-    def test_run_task_twice(self):
-        """If two copies of a TI run, the new one should die, and old should live"""
-        dagbag = models.DagBag(
-            dag_folder=TEST_DAG_FOLDER,
-            include_examples=False,
-        )
-        TI = models.TaskInstance
-        dag = dagbag.dags.get('sleep_forever_dag')
-        task = dag.task_dict.get('sleeps_forever')
-    
-        ti = TI(task=task, execution_date=DEFAULT_DATE)
-        job1 = jobs.LocalTaskJob(
-            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
-        job2 = jobs.LocalTaskJob(
-            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
-
-        p1 = multiprocessing.Process(target=job1.run)
-        p2 = multiprocessing.Process(target=job2.run)
-        try:
-            p1.start()
-            start_time = timetime()
-            sleep(5.0) # must wait for session to be created on p1
-            settings.engine.dispose()
-            session = settings.Session()
-            ti.refresh_from_db(session=session)
-            self.assertEqual(State.RUNNING, ti.state)
-            p1pid = ti.pid
-            settings.engine.dispose()
-            p2.start()
-            p2.join(5) # wait 5 seconds until termination
-            self.assertFalse(p2.is_alive())
-            self.assertTrue(p1.is_alive())
-
-            settings.engine.dispose()
-            session = settings.Session()
-            ti.refresh_from_db(session=session)
-            self.assertEqual(State.RUNNING, ti.state)
-            self.assertEqual(p1pid, ti.pid)
-
-            # check changing hostname kills task
-            ti.refresh_from_db(session=session, lock_for_update=True)
-            ti.hostname = 'nonexistenthostname'
-            session.merge(ti)
-            session.commit()
-
-            p1.join(5)
-            self.assertFalse(p1.is_alive())
-        finally:
-            try:
-                p1.terminate()
-            except AttributeError:
-                pass # process already terminated
-            try:
-                p2.terminate()
-            except AttributeError:
-                pass # process already terminated
-            session.close()
-
     def test_terminate_task(self):
         """If a task instance's db state get deleted, it should fail"""
         TI = models.TaskInstance

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/dags/sleep_forever_dag.py
----------------------------------------------------------------------
diff --git a/tests/dags/sleep_forever_dag.py b/tests/dags/sleep_forever_dag.py
deleted file mode 100644
index b1f810e..0000000
--- a/tests/dags/sleep_forever_dag.py
+++ /dev/null
@@ -1,29 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Used for unit tests"""
-import airflow
-from airflow.operators.bash_operator import BashOperator
-from airflow.models import DAG
-
-dag = DAG(
-    dag_id='sleep_forever_dag',
-    schedule_interval=None,
-)
-
-task = BashOperator(
-    task_id='sleeps_forever',
-    dag=dag,
-    bash_command="sleep 10000000000",
-    start_date=airflow.utils.dates.days_ago(2),
-    owner='airflow')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/dags/test_double_trigger.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_double_trigger.py b/tests/dags/test_double_trigger.py
new file mode 100644
index 0000000..b58f5c9
--- /dev/null
+++ b/tests/dags/test_double_trigger.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+
+DEFAULT_DATE = datetime(2016, 1, 1)
+
+args = {
+    'owner': 'airflow',
+    'start_date': DEFAULT_DATE,
+}
+
+dag = DAG(dag_id='test_localtaskjob_double_trigger', default_args=args)
+task = DummyOperator(
+    task_id='test_localtaskjob_double_trigger_task',
+    dag=dag)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b26a5d9/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index d208fd4..aee0e9c 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -23,12 +23,13 @@ import os
 import shutil
 import unittest
 import six
-import sys
+import socket
 from tempfile import mkdtemp
 
 from airflow import AirflowException, settings, models
 from airflow.bin import cli
-from airflow.jobs import BackfillJob, SchedulerJob
+from airflow.executors import SequentialExecutor
+from airflow.jobs import BackfillJob, SchedulerJob, LocalTaskJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
@@ -36,8 +37,12 @@ from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 from airflow.utils.dag_processing import SimpleDagBag
+
 from mock import patch
-from tests.executor.test_executor import TestExecutor
+from sqlalchemy.orm.session import make_transient
+from tests.executors.test_executor import TestExecutor
+
+from tests.core import TEST_DAG_FOLDER
 
 from airflow import configuration
 configuration.load_test_config()
@@ -344,6 +349,81 @@ class BackfillJobTest(unittest.TestCase):
                 self.assertEqual(State.NONE, ti.state)
 
 
+class LocalTaskJobTest(unittest.TestCase):
+    def setUp(self):
+        pass
+
+    @patch.object(LocalTaskJob, "_is_descendant_process")
+    def test_localtaskjob_heartbeat(self, is_descendant):
+        session = settings.Session()
+        dag = DAG(
+            'test_localtaskjob_heartbeat',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='op1')
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id="test",
+                               state=State.SUCCESS,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE,
+                               session=session)
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        ti.hostname = "blablabla"
+        session.commit()
+
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        self.assertRaises(AirflowException, job1.heartbeat_callback)
+
+        is_descendant.return_value = True
+        ti.state = State.RUNNING
+        ti.hostname = socket.getfqdn()
+        ti.pid = 1
+        session.merge(ti)
+        session.commit()
+
+        ret = job1.heartbeat_callback()
+        self.assertEqual(ret, None)
+
+        is_descendant.return_value = False
+        self.assertRaises(AirflowException, job1.heartbeat_callback)
+
+    def test_localtaskjob_double_trigger(self):
+        dagbag = models.DagBag(
+            dag_folder=TEST_DAG_FOLDER,
+            include_examples=False,
+        )
+        dag = dagbag.dags.get('test_localtaskjob_double_trigger')
+        task = dag.get_task('test_localtaskjob_double_trigger_task')
+
+        session = settings.Session()
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id="test",
+                               state=State.SUCCESS,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE,
+                               session=session)
+        ti = dr.get_task_instance(task_id=task.task_id, session=session)
+        ti.state = State.RUNNING
+        ti.hostname = socket.getfqdn()
+        ti.pid = 1
+        session.commit()
+
+        ti_run = TI(task=task, execution_date=DEFAULT_DATE)
+        job1 = LocalTaskJob(task_instance=ti_run, ignore_ti_state=True, executor=SequentialExecutor())
+        self.assertRaises(AirflowException, job1.run)
+
+        ti = dr.get_task_instance(task_id=task.task_id, session=session)
+        self.assertEqual(ti.pid, 1)
+        self.assertEqual(ti.state, State.RUNNING)
+
+        session.close()
+
+
 class SchedulerJobTest(unittest.TestCase):
     # These defaults make the test faster to run
     default_scheduler_args = {"file_process_interval": 0,


[14/28] incubator-airflow git commit: [AIRFLOW-937] Improve performance of task_stats

Posted by bo...@apache.org.
[AIRFLOW-937] Improve performance of task_stats

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-937

Testing Done:
- Shouldn't change functionality significantly,
should pass existing tests (if they exist)

This leads to slightly different results, but it
reduced the time of this endpoint from 90s to 9s
on our data, and the existing logic for task_ids
was already incorrect (task_ids may not be
distinct across dags)

Closes #2121 from saguziel/task-stats-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/66f39ca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/66f39ca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/66f39ca0

Branch: refs/heads/v1-8-test
Commit: 66f39ca0c3511da2ff86858ce7ea569d11adbd44
Parents: 0964f18
Author: Alex Guziel <al...@airbnb.com>
Authored: Thu Mar 2 14:04:49 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:21:13 2017 -0700

----------------------------------------------------------------------
 airflow/www/views.py | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66f39ca0/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d8acfef..d1a1f9a 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -497,26 +497,24 @@ class Airflow(BaseView):
 
     @expose('/task_stats')
     def task_stats(self):
-        task_ids = []
-        dag_ids = []
-        for dag in dagbag.dags.values():
-            task_ids += dag.task_ids
-            if not dag.is_subdag:
-                dag_ids.append(dag.dag_id)
-
         TI = models.TaskInstance
         DagRun = models.DagRun
+        Dag = models.DagModel
         session = Session()
 
         LastDagRun = (
             session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
+            .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state != State.RUNNING)
+            .filter(Dag.is_active == 1)
             .group_by(DagRun.dag_id)
             .subquery('last_dag_run')
         )
         RunningDagRun = (
             session.query(DagRun.dag_id, DagRun.execution_date)
+            .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state == State.RUNNING)
+            .filter(Dag.is_active == 1)
             .subquery('running_dag_run')
         )
 
@@ -527,16 +525,12 @@ class Airflow(BaseView):
             .join(LastDagRun, and_(
                 LastDagRun.c.dag_id == TI.dag_id,
                 LastDagRun.c.execution_date == TI.execution_date))
-            .filter(TI.task_id.in_(task_ids))
-            .filter(TI.dag_id.in_(dag_ids))
         )
         RunningTI = (
             session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
             .join(RunningDagRun, and_(
                 RunningDagRun.c.dag_id == TI.dag_id,
                 RunningDagRun.c.execution_date == TI.execution_date))
-            .filter(TI.task_id.in_(task_ids))
-            .filter(TI.dag_id.in_(dag_ids))
         )
 
         UnionTI = union_all(LastTI, RunningTI).alias('union_ti')


[15/28] incubator-airflow git commit: [AIRFLOW-938] Use test for True in task_stats queries

Posted by bo...@apache.org.
[AIRFLOW-938] Use test for True in task_stats queries

Fix a bug with the task_stats query on postgres which doesn't support
== 1.

https://issues.apache.org/jira/browse/AIRFLOW-938

I've seen the other PR but I'll try to see if this
method works because I believe `__eq__(True)` is
just `== True`, and it is how it is down here http
://docs.sqlalchemy.org/en/latest/core/sqlelement.h
tml#sqlalchemy.sql.expression.and_ (underscore is
part of link)

Closes #2123 from saguziel/aguziel-fix-task-
stats-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/157054e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/157054e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/157054e2

Branch: refs/heads/v1-8-test
Commit: 157054e2c9967e48fb3f3157081baf686dcee5e8
Parents: 66f39ca
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Mar 3 13:52:03 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:21:23 2017 -0700

----------------------------------------------------------------------
 airflow/www/views.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/157054e2/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d1a1f9a..962c1f0 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -506,7 +506,7 @@ class Airflow(BaseView):
             session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
             .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state != State.RUNNING)
-            .filter(Dag.is_active == 1)
+            .filter(Dag.is_active == True)
             .group_by(DagRun.dag_id)
             .subquery('last_dag_run')
         )
@@ -514,7 +514,7 @@ class Airflow(BaseView):
             session.query(DagRun.dag_id, DagRun.execution_date)
             .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state == State.RUNNING)
-            .filter(Dag.is_active == 1)
+            .filter(Dag.is_active == True)
             .subquery('running_dag_run')
         )
 


[16/28] incubator-airflow git commit: [AIRFLOW-719] Prevent DAGs from ending prematurely

Posted by bo...@apache.org.
[AIRFLOW-719] Prevent DAGs from ending prematurely

DAGs using ALL_SUCCESS and ONE_SUCCESS trigger
rules were ending
prematurely when upstream tasks were skipped.
Changes mean that the
ALL_SUCCESS and ONE_SUCCESS triggers rule
encompasses both SUCCESS and
SKIPPED tasks.

Closes #2125 from dhuang/AIRFLOW-719


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4077c6de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4077c6de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4077c6de

Branch: refs/heads/v1-8-test
Commit: 4077c6de297566a4c598065867a9a27324ae6eb1
Parents: 157054e
Author: Daniel Huang <dx...@gmail.com>
Authored: Sat Mar 4 17:33:23 2017 +0100
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:27:30 2017 -0700

----------------------------------------------------------------------
 airflow/ti_deps/deps/trigger_rule_dep.py      |  6 +-
 tests/dags/test_dagrun_short_circuit_false.py | 38 +++++++++++
 tests/models.py                               | 79 +++++++++++++++++++---
 3 files changed, 111 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4077c6de/airflow/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index 281ed51..da13bba 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -135,7 +135,7 @@ class TriggerRuleDep(BaseTIDep):
             if tr == TR.ALL_SUCCESS:
                 if upstream_failed or failed:
                     ti.set_state(State.UPSTREAM_FAILED, session)
-                elif skipped:
+                elif skipped == upstream:
                     ti.set_state(State.SKIPPED, session)
             elif tr == TR.ALL_FAILED:
                 if successes or skipped:
@@ -148,7 +148,7 @@ class TriggerRuleDep(BaseTIDep):
                     ti.set_state(State.SKIPPED, session)
 
         if tr == TR.ONE_SUCCESS:
-            if successes <= 0:
+            if successes <= 0 and skipped <= 0:
                 yield self._failing_status(
                     reason="Task's trigger rule '{0}' requires one upstream "
                     "task success, but none were found. "
@@ -162,7 +162,7 @@ class TriggerRuleDep(BaseTIDep):
                     "upstream_tasks_state={1}, upstream_task_ids={2}"
                     .format(tr, upstream_tasks_state, task.upstream_task_ids))
         elif tr == TR.ALL_SUCCESS:
-            num_failures = upstream - successes
+            num_failures = upstream - (successes + skipped)
             if num_failures > 0:
                 yield self._failing_status(
                     reason="Task's trigger rule '{0}' requires all upstream "

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4077c6de/tests/dags/test_dagrun_short_circuit_false.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_dagrun_short_circuit_false.py b/tests/dags/test_dagrun_short_circuit_false.py
new file mode 100644
index 0000000..805ab67
--- /dev/null
+++ b/tests/dags/test_dagrun_short_circuit_false.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.python_operator import ShortCircuitOperator
+from airflow.operators.dummy_operator import DummyOperator
+
+
+# DAG that has its short circuit op fail and skip multiple downstream tasks
+dag = DAG(
+    dag_id='test_dagrun_short_circuit_false',
+    start_date=datetime(2017, 1, 1)
+)
+dag_task1 = ShortCircuitOperator(
+    task_id='test_short_circuit_false',
+    dag=dag,
+    python_callable=lambda: False)
+dag_task2 = DummyOperator(
+    task_id='test_state_skipped1',
+    dag=dag)
+dag_task3 = DummyOperator(
+    task_id='test_state_skipped2',
+    dag=dag)
+dag_task1.set_downstream(dag_task2)
+dag_task2.set_downstream(dag_task3)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4077c6de/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 7ca01e7..d904ff3 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -34,6 +34,7 @@ from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils.state import State
 from mock import patch
 from nose_parameterized import parameterized
+from tests.core import TEST_DAG_FOLDER
 
 DEFAULT_DATE = datetime.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
@@ -117,13 +118,71 @@ class DagTest(unittest.TestCase):
         self.assertEqual(dag.dag_id, 'creating_dag_in_cm')
         self.assertEqual(dag.tasks[0].task_id, 'op6')
 
+
 class DagRunTest(unittest.TestCase):
+
+    def setUp(self):
+        self.dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
+
+    def create_dag_run(self, dag_id, state=State.RUNNING, task_states=None):
+        now = datetime.datetime.now()
+        dag = self.dagbag.get_dag(dag_id)
+        dag_run = dag.create_dagrun(
+            run_id='manual__' + now.isoformat(),
+            execution_date=now,
+            start_date=now,
+            state=State.RUNNING,
+            external_trigger=False,
+        )
+
+        if task_states is not None:
+            session = settings.Session()
+            for task_id, state in task_states.items():
+                ti = dag_run.get_task_instance(task_id)
+                ti.set_state(state, session)
+            session.close()
+
+        return dag_run
+
     def test_id_for_date(self):
         run_id = models.DagRun.id_for_date(
             datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
-        assert run_id == 'scheduled__2015-01-02T03:04:05', (
+        self.assertEqual(
+            'scheduled__2015-01-02T03:04:05', run_id,
             'Generated run_id did not match expectations: {0}'.format(run_id))
 
+    def test_dagrun_running_when_upstream_skipped(self):
+        """
+        Tests that a DAG run is not failed when an upstream task is skipped
+        """
+        initial_task_states = {
+            'test_short_circuit_false': State.SUCCESS,
+            'test_state_skipped1': State.SKIPPED,
+            'test_state_skipped2': State.NONE,
+        }
+        # dags/test_dagrun_short_circuit_false.py
+        dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
+                                      state=State.RUNNING,
+                                      task_states=initial_task_states)
+        updated_dag_state = dag_run.update_state()
+        self.assertEqual(State.RUNNING, updated_dag_state)
+
+    def test_dagrun_success_when_all_skipped(self):
+        """
+        Tests that a DAG run succeeds when all tasks are skipped
+        """
+        initial_task_states = {
+            'test_short_circuit_false': State.SUCCESS,
+            'test_state_skipped1': State.SKIPPED,
+            'test_state_skipped2': State.SKIPPED,
+        }
+        # dags/test_dagrun_short_circuit_false.py
+        dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
+                                      state=State.RUNNING,
+                                      task_states=initial_task_states)
+        updated_dag_state = dag_run.update_state()
+        self.assertEqual(State.SUCCESS, updated_dag_state)
+
 
 class DagBagTest(unittest.TestCase):
 
@@ -501,7 +560,7 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertEqual(dt, ti.end_date+max_delay)
 
     def test_depends_on_past(self):
-        dagbag = models.DagBag()
+        dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
         dag = dagbag.get_dag('test_depends_on_past')
         dag.clear()
         task = dag.tasks[0]
@@ -530,10 +589,11 @@ class TaskInstanceTest(unittest.TestCase):
         #
         # Tests for all_success
         #
-        ['all_success', 5, 0, 0, 0, 0, True, None, True],
-        ['all_success', 2, 0, 0, 0, 0, True, None, False],
-        ['all_success', 2, 0, 1, 0, 0, True, ST.UPSTREAM_FAILED, False],
-        ['all_success', 2, 1, 0, 0, 0, True, ST.SKIPPED, False],
+        ['all_success', 5, 0, 0, 0, 5, True, None, True],
+        ['all_success', 2, 0, 0, 0, 2, True, None, False],
+        ['all_success', 2, 0, 1, 0, 3, True, ST.UPSTREAM_FAILED, False],
+        ['all_success', 2, 1, 0, 0, 3, True, None, False],
+        ['all_success', 0, 5, 0, 0, 5, True, ST.SKIPPED, True],
         #
         # Tests for one_success
         #
@@ -541,6 +601,7 @@ class TaskInstanceTest(unittest.TestCase):
         ['one_success', 2, 0, 0, 0, 2, True, None, True],
         ['one_success', 2, 0, 1, 0, 3, True, None, True],
         ['one_success', 2, 1, 0, 0, 3, True, None, True],
+        ['one_success', 0, 2, 0, 0, 2, True, None, True],
         #
         # Tests for all_failed
         #
@@ -552,9 +613,9 @@ class TaskInstanceTest(unittest.TestCase):
         #
         # Tests for one_failed
         #
-        ['one_failed', 5, 0, 0, 0, 0, True, None, False],
-        ['one_failed', 2, 0, 0, 0, 0, True, None, False],
-        ['one_failed', 2, 0, 1, 0, 0, True, None, True],
+        ['one_failed', 5, 0, 0, 0, 5, True, ST.SKIPPED, False],
+        ['one_failed', 2, 0, 0, 0, 2, True, None, False],
+        ['one_failed', 2, 0, 1, 0, 2, True, None, True],
         ['one_failed', 2, 1, 0, 0, 3, True, None, False],
         ['one_failed', 2, 3, 0, 0, 5, True, ST.SKIPPED, False],
         #


[25/28] incubator-airflow git commit: Make compatible with 1.8

Posted by bo...@apache.org.
Make compatible with 1.8


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8df046bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8df046bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8df046bf

Branch: refs/heads/v1-8-test
Commit: 8df046bfbec670a253139c83c6174bb88f25ee7f
Parents: 2b26a5d
Author: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Authored: Sun Mar 12 10:11:15 2017 -0700
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 10:11:15 2017 -0700

----------------------------------------------------------------------
 tests/executors/__init__.py      | 13 ++++++++
 tests/executors/test_executor.py | 56 +++++++++++++++++++++++++++++++++++
 2 files changed, 69 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8df046bf/tests/executors/__init__.py
----------------------------------------------------------------------
diff --git a/tests/executors/__init__.py b/tests/executors/__init__.py
new file mode 100644
index 0000000..a85b772
--- /dev/null
+++ b/tests/executors/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8df046bf/tests/executors/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py
new file mode 100644
index 0000000..9ec6cd4
--- /dev/null
+++ b/tests/executors/test_executor.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.executors.base_executor import BaseExecutor
+from airflow.utils.state import State
+
+from airflow import settings
+
+
+class TestExecutor(BaseExecutor):
+    """
+    TestExecutor is used for unit testing purposes.
+    """
+    def __init__(self, do_update=False, *args, **kwargs):
+        self.do_update = do_update
+        self._running = []
+        self.history = []
+
+        super(TestExecutor, self).__init__(*args, **kwargs)
+
+    def execute_async(self, key, command, queue=None):
+        self.logger.debug("{} running task instances".format(len(self.running)))
+        self.logger.debug("{} in queue".format(len(self.queued_tasks)))
+
+    def heartbeat(self):
+        session = settings.Session()
+        if self.do_update:
+            self.history.append(list(self.queued_tasks.values()))
+            while len(self._running) > 0:
+                ti = self._running.pop()
+                ti.set_state(State.SUCCESS, session)
+            for key, val in list(self.queued_tasks.items()):
+                (command, priority, queue, ti) = val
+                ti.set_state(State.RUNNING, session)
+                self._running.append(ti)
+                self.queued_tasks.pop(key)
+
+        session.commit()
+        session.close()
+
+    def terminate(self):
+        pass
+
+    def end(self):
+        self.sync()
+


[20/28] incubator-airflow git commit: [AIRFLOW-961] run onkill when SIGTERMed

Posted by bo...@apache.org.
[AIRFLOW-961] run onkill when SIGTERMed

Closes #2138 from saguziel/aguziel-sigterm


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dacc69a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dacc69a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dacc69a5

Branch: refs/heads/v1-8-test
Commit: dacc69a504cbfcdba5e2b24220fa1982637b17d3
Parents: dcc8ede
Author: Alex Guziel <al...@airbnb.com>
Authored: Sat Mar 11 10:43:49 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:34:09 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dacc69a5/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index b6913f3..36548c2 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2060,6 +2060,14 @@ class LocalTaskJob(BaseJob):
 
     def _execute(self):
         self.task_runner = get_task_runner(self)
+
+        def signal_handler(signum, frame):
+            '''Setting kill signal handler'''
+            logging.error("Killing subprocess")
+            self.on_kill()
+            raise AirflowException("LocalTaskJob received SIGTERM signal")
+        signal.signal(signal.SIGTERM, signal_handler)
+
         try:
             self.task_runner.start()
 


[21/28] incubator-airflow git commit: AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[

Posted by bo...@apache.org.
AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[

In a backfill one can specify a specific task to
execute. We
create a subset of the orginal tasks in a subdag
from the original dag.
The subdag has the same name as the original dag.
This breaks
the integrity check of a dag_run as tasks are
suddenly not in
scope any more.

Closes #2122 from bolkedebruin/AIRFLOW-921


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a8f2c27e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a8f2c27e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a8f2c27e

Branch: refs/heads/v1-8-test
Commit: a8f2c27ed44449e6611c7c4a9ec8cf2371cf0987
Parents: dacc69a
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 10:52:07 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:34:22 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py   |  1 +
 airflow/models.py | 12 +++++++++++-
 tests/jobs.py     | 49 +++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 36548c2..c61b229 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1803,6 +1803,7 @@ class BackfillJob(BaseJob):
 
             # explictely mark running as we can fill gaps
             run.state = State.RUNNING
+            run.run_id = run_id
             run.verify_integrity(session=session)
 
             # check if we have orphaned tasks

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index e63da3e..32c52ac 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2681,6 +2681,8 @@ class DAG(BaseDag, LoggingMixin):
         self.orientation = orientation
         self.catchup = catchup
 
+        self.partial = False
+
         self._comps = {
             'dag_id',
             'task_ids',
@@ -3186,6 +3188,10 @@ class DAG(BaseDag, LoggingMixin):
                 tid for tid in t._upstream_task_ids if tid in dag.task_ids]
             t._downstream_task_ids = [
                 tid for tid in t._downstream_task_ids if tid in dag.task_ids]
+
+        if len(dag.tasks) < len(self.tasks):
+            dag.partial = True
+
         return dag
 
     def has_task(self, task_id):
@@ -3946,6 +3952,9 @@ class DagRun(Base):
                 else:
                     tis = tis.filter(TI.state.in_(state))
 
+        if self.dag and self.dag.partial:
+            tis = tis.filter(TI.task_id.in_(self.dag.task_ids))
+
         return tis.all()
 
     @provide_session
@@ -4006,6 +4015,7 @@ class DagRun(Base):
         """
 
         dag = self.get_dag()
+
         tis = self.get_task_instances(session=session)
 
         logging.info("Updating state for {} considering {} task(s)"
@@ -4090,7 +4100,7 @@ class DagRun(Base):
             try:
                 dag.get_task(ti.task_id)
             except AirflowException:
-                if self.state is not State.RUNNING:
+                if self.state is not State.RUNNING and not dag.partial:
                     ti.state = State.REMOVED
 
         # check for missing tasks

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 1acf269..d208fd4 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -42,6 +42,8 @@ from tests.executor.test_executor import TestExecutor
 from airflow import configuration
 configuration.load_test_config()
 
+import sqlalchemy
+
 try:
     from unittest import mock
 except ImportError:
@@ -294,6 +296,53 @@ class BackfillJobTest(unittest.TestCase):
         self.assertEqual(ti.state, State.SUCCESS)
         dag.clear()
 
+    def test_sub_set_subdag(self):
+        dag = DAG(
+            'test_sub_set_subdag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='leave1')
+            op2 = DummyOperator(task_id='leave2')
+            op3 = DummyOperator(task_id='upstream_level_1')
+            op4 = DummyOperator(task_id='upstream_level_2')
+            op5 = DummyOperator(task_id='upstream_level_3')
+            # order randomly
+            op2.set_downstream(op3)
+            op1.set_downstream(op3)
+            op4.set_downstream(op5)
+            op3.set_downstream(op4)
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id="test",
+                               state=State.SUCCESS,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE)
+
+        executor = TestExecutor(do_update=True)
+        sub_dag = dag.sub_dag(task_regex="leave*",
+                              include_downstream=False,
+                              include_upstream=False)
+        job = BackfillJob(dag=sub_dag,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE,
+                          executor=executor)
+        job.run()
+
+        self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db)
+        # the run_id should have changed, so a refresh won't work
+        drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
+        dr = drs[0]
+
+        self.assertEqual(BackfillJob.ID_FORMAT_PREFIX.format(DEFAULT_DATE.isoformat()),
+                         dr.run_id)
+        for ti in dr.get_task_instances():
+            if ti.task_id == 'leave1' or ti.task_id == 'leave2':
+                self.assertEqual(State.SUCCESS, ti.state)
+            else:
+                self.assertEqual(State.NONE, ti.state)
+
 
 class SchedulerJobTest(unittest.TestCase):
     # These defaults make the test faster to run


[18/28] incubator-airflow git commit: [AIRFLOW-967] Wrap strings in native for py2 ldap compatibility

Posted by bo...@apache.org.
[AIRFLOW-967] Wrap strings in native for py2 ldap compatibility

ldap3 has issues with newstr being passed. This
wraps any call
that goes over the wire to the ldap server in
native() to ensure
the native string type is used.

Closes #2141 from bolkedebruin/AIRFLOW-967


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8ffaadf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8ffaadf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8ffaadf1

Branch: refs/heads/v1-8-test
Commit: 8ffaadf173e1cd46661a592ad55b0d41e460c05a
Parents: 1f3aead
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Mar 10 12:00:16 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:32:02 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/auth/backends/ldap_auth.py | 26 +++++++++++++++----------
 1 file changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ffaadf1/airflow/contrib/auth/backends/ldap_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 24a63bc..13b49f9 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from future.utils import native
 
 import flask_login
 from flask_login import login_required, current_user, logout_user
@@ -60,7 +61,7 @@ def get_ldap_connection(dn=None, password=None):
         pass
 
     server = Server(configuration.get("ldap", "uri"), use_ssl, tls_configuration)
-    conn = Connection(server, dn, password)
+    conn = Connection(server, native(dn), native(password))
 
     if not conn.bind():
         LOG.error("Cannot bind to ldap server: %s ", conn.last_error)
@@ -71,14 +72,15 @@ def get_ldap_connection(dn=None, password=None):
 
 def group_contains_user(conn, search_base, group_filter, user_name_attr, username):
     search_filter = '(&({0}))'.format(group_filter)
-    if not conn.search(search_base, search_filter, attributes=[user_name_attr]):
-        LOG.warn("Unable to find group for %s %s", search_base, search_filter)
+    if not conn.search(native(search_base), native(search_filter),
+                       attributes=[native(user_name_attr)]):
+        LOG.warning("Unable to find group for %s %s", search_base, search_filter)
     else:
         for resp in conn.response:
             if (
-                'attributes' in resp and (
-                    resp['attributes'].get(user_name_attr)[0] == username or
-                    resp['attributes'].get(user_name_attr) == username
+                        'attributes' in resp and (
+                            resp['attributes'].get(user_name_attr)[0] == username or
+                            resp['attributes'].get(user_name_attr) == username
                 )
             ):
                 return True
@@ -87,7 +89,7 @@ def group_contains_user(conn, search_base, group_filter, user_name_attr, usernam
 
 def groups_user(conn, search_base, user_filter, user_name_att, username):
     search_filter = "(&({0})({1}={2}))".format(user_filter, user_name_att, username)
-    res = conn.search(search_base, search_filter, attributes=["memberOf"])
+    res = conn.search(native(search_base), native(search_filter), attributes=[native("memberOf")])
     if not res:
         LOG.info("Cannot find user %s", username)
         raise AuthenticationError("Invalid username or password")
@@ -118,7 +120,8 @@ class LdapUser(models.User):
         self.ldap_groups = []
 
         # Load and cache superuser and data_profiler settings.
-        conn = get_ldap_connection(configuration.get("ldap", "bind_user"), configuration.get("ldap", "bind_password"))
+        conn = get_ldap_connection(configuration.get("ldap", "bind_user"),
+                                   configuration.get("ldap", "bind_password"))
         try:
             self.superuser = group_contains_user(conn,
                                                  configuration.get("ldap", "basedn"),
@@ -151,7 +154,8 @@ class LdapUser(models.User):
 
     @staticmethod
     def try_login(username, password):
-        conn = get_ldap_connection(configuration.get("ldap", "bind_user"), configuration.get("ldap", "bind_password"))
+        conn = get_ldap_connection(configuration.get("ldap", "bind_user"),
+                                   configuration.get("ldap", "bind_password"))
 
         search_filter = "(&({0})({1}={2}))".format(
             configuration.get("ldap", "user_filter"),
@@ -171,7 +175,9 @@ class LdapUser(models.User):
 
         # todo: BASE or ONELEVEL?
 
-        res = conn.search(configuration.get("ldap", "basedn"), search_filter, search_scope=search_scope)
+        res = conn.search(native(configuration.get("ldap", "basedn")),
+                          native(search_filter),
+                          search_scope=native(search_scope))
 
         # todo: use list or result?
         if not res:


[06/28] incubator-airflow git commit: [AIRFLOW-856] Make sure execution date is set for local client

Posted by bo...@apache.org.
[AIRFLOW-856] Make sure execution date is set for local client

In the local api client the execution date was
hardi coded to None.
Secondly, when no execution date was specified the
execution date
was set to datetime.now(). Datetime.now() includes
the fractional seconds
that are supported in the database, but they are
not supported in
a.o. the current logging setup. Now we cut off
fractional seconds for
the execution date.

Closes #2064 from bolkedebruin/AIRFLOW-856


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3918e5e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3918e5e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3918e5e1

Branch: refs/heads/v1-8-test
Commit: 3918e5e1c489bf01a6a836d1d76e2251137af5de
Parents: 3b1e81a
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Feb 10 14:17:26 2017 +0100
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:09:43 2017 -0700

----------------------------------------------------------------------
 airflow/api/client/local_client.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3918e5e1/airflow/api/client/local_client.py
----------------------------------------------------------------------
diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py
index 05f27f6..5422aa3 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -1,4 +1,4 @@
-# -*- coding: utf-8 -*-
+    # -*- coding: utf-8 -*-
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.