You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/25 22:00:06 UTC

[1/6] incubator-airflow git commit: [AIRFLOW-139] Let psycopg2 handle autocommit for PostgresHook

Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 1ea4c533f -> 76cbfeb3a


[AIRFLOW-139] Let psycopg2 handle autocommit for PostgresHook

The server-side autocommit setting was removed and reimplemented
in client applications and languages. Server-side autocommit was
causing too many problems with languages and applications that
wanted to control their own autocommit behavior,
so autocommit was removed from the server and added to individual client APIs as appropriate

Closes #1821 from danielzohar/AIRFLOW-
139_vacuum_operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ac9167f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ac9167f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ac9167f3

Branch: refs/heads/v1-8-test
Commit: ac9167f37b586f9ece381763b91a0ee25d736f38
Parents: a2b0ea3
Author: Daniel Zohar <i...@danielzohar.com>
Authored: Tue Jan 24 15:45:39 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 24 15:45:48 2017 +0100

----------------------------------------------------------------------
 airflow/hooks/postgres_hook.py |  4 +---
 tests/operators/operators.py   | 24 ++++++++++++++++++------
 2 files changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9167f3/airflow/hooks/postgres_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py
index d096d75..75c8226 100644
--- a/airflow/hooks/postgres_hook.py
+++ b/airflow/hooks/postgres_hook.py
@@ -26,7 +26,7 @@ class PostgresHook(DbApiHook):
     '''
     conn_name_attr = 'postgres_conn_id'
     default_conn_name = 'postgres_default'
-    supports_autocommit = False
+    supports_autocommit = True
 
     def get_conn(self):
         conn = self.get_connection(self.postgres_conn_id)
@@ -41,8 +41,6 @@ class PostgresHook(DbApiHook):
             if arg_name in ['sslmode', 'sslcert', 'sslkey', 'sslrootcert', 'sslcrl', 'application_name']:
                 conn_args[arg_name] = arg_val
         psycopg2_conn = psycopg2.connect(**conn_args)
-        if psycopg2_conn.server_version < 70400:
-            self.supports_autocommit = True
         return psycopg2_conn
 
     @staticmethod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9167f3/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 7458827..7aaf12e 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -15,18 +15,14 @@
 from __future__ import print_function
 
 import datetime
-import os
-import unittest
-import six
 
-from airflow import DAG, configuration, operators, utils
+from airflow import DAG, configuration, operators
 from airflow.utils.tests import skipUnlessImported
+
 configuration.load_test_config()
 
-import os
 import unittest
 
-
 DEFAULT_DATE = datetime.datetime(2015, 1, 1)
 DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
 DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
@@ -118,6 +114,7 @@ class MySqlTest(unittest.TestCase):
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+
 @skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
 class PostgresTest(unittest.TestCase):
     def setUp(self):
@@ -182,6 +179,21 @@ class PostgresTest(unittest.TestCase):
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_vacuum(self):
+        """
+        Verifies the VACUUM operation runs well with the PostgresOperator
+        """
+        import airflow.operators.postgres_operator
+
+        sql = "VACUUM ANALYZE;"
+        t = operators.postgres_operator.PostgresOperator(
+            task_id='postgres_operator_test_vacuum',
+            sql=sql,
+            dag=self.dag,
+            autocommit=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+
 @skipUnlessImported('airflow.operators.hive_operator', 'HiveOperator')
 @skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
 class TransferTests(unittest.TestCase):


[5/6] incubator-airflow git commit: [AIRFLOW-807] Improve scheduler performance for large DAGs

Posted by bo...@apache.org.
[AIRFLOW-807] Improve scheduler performance for large DAGs

MySQL's query optimizer selects the wrong index, this
has a significant impact on the performance of the
scheduler.

Closes #2021 from criccomini/AIRFLOW-807


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6b2a3ca2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6b2a3ca2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6b2a3ca2

Branch: refs/heads/v1-8-test
Commit: 6b2a3ca2ee4ee3415ef72ea1fa3fc694350e9efc
Parents: 5479ac8
Author: Chris Riccomini <ch...@wepay.com>
Authored: Wed Jan 25 22:54:09 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 22:54:18 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py                                 |  6 ++++
 ...7_add_dag_id_state_index_on_dag_run_table.py | 37 ++++++++++++++++++++
 2 files changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0ac3607..201d87f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -538,12 +538,18 @@ class SchedulerJob(BaseJob):
         Where assuming that the scheduler runs often, so we only check for
         tasks that should have succeeded in the past hour.
         """
+        if not any([ti.sla for ti in dag.tasks]):
+            self.logger.info("Skipping SLA check for {} because "
+              "no tasks in DAG have SLAs".format(dag))
+            return
+
         TI = models.TaskInstance
         sq = (
             session
             .query(
                 TI.task_id,
                 func.max(TI.execution_date).label('max_ti'))
+            .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
             .filter(TI.dag_id == dag.dag_id)
             .filter(TI.state == State.SUCCESS)
             .filter(TI.task_id.in_(dag.task_ids))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
new file mode 100644
index 0000000..c500966
--- /dev/null
+++ b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
@@ -0,0 +1,37 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add dag_id/state index on dag_run table
+
+Revision ID: 127d2bf2dfa7
+Revises: 1a5a9e6bf2b5
+Create Date: 2017-01-25 11:43:51.635667
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '127d2bf2dfa7'
+down_revision = '1a5a9e6bf2b5'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+def upgrade():
+    op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False)
+
+
+def downgrade():
+    op.drop_index('dag_id_state', table_name='dag_run')
+


[6/6] incubator-airflow git commit: Merge branch 'master' into v1-8-test

Posted by bo...@apache.org.
Merge branch 'master' into v1-8-test


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/76cbfeb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/76cbfeb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/76cbfeb3

Branch: refs/heads/v1-8-test
Commit: 76cbfeb3af05d2eaf2378430a0092454fbb07e03
Parents: 1ea4c53 6b2a3ca
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jan 25 22:59:50 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 22:59:50 2017 +0100

----------------------------------------------------------------------
 airflow/hooks/postgres_hook.py                  |  4 +-
 airflow/jobs.py                                 | 39 ++++++++++----------
 ...7_add_dag_id_state_index_on_dag_run_table.py | 37 +++++++++++++++++++
 .../1a5a9e6bf2b5_add_state_index_for_dagruns.py | 37 -------------------
 tests/operators/operators.py                    | 24 +++++++++---
 5 files changed, 75 insertions(+), 66 deletions(-)
----------------------------------------------------------------------



[3/6] incubator-airflow git commit: [AIRFLOW-803] Revert join with dag_runs in _execute_task_instances

Posted by bo...@apache.org.
[AIRFLOW-803] Revert join with dag_runs in _execute_task_instances

TaskInstances will be set to 'scheduled' if they meet the criteria to run,
 also the ones up for retry. No task_instance will be send to the executor
in another state than 'scheduled'.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4edf9138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4edf9138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4edf9138

Branch: refs/heads/v1-8-test
Commit: 4edf9138d03fa4cbce5a1fc9059735d6f80f80f2
Parents: ac9167f
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jan 25 11:54:30 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 13:29:14 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py                                 |  6 +---
 .../1a5a9e6bf2b5_add_state_index_for_dagruns.py | 37 --------------------
 2 files changed, 1 insertion(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4edf9138/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8bb93bb..3e3229f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -34,7 +34,7 @@ import time
 from time import sleep
 
 import psutil
-from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_
+from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
 from tabulate import tabulate
@@ -955,10 +955,6 @@ class SchedulerJob(BaseJob):
             .query(TI)
             .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
             .filter(TI.state.in_(states))
-            .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
-                               TI.execution_date == DagRun.execution_date,
-                               DagRun.state == State.RUNNING,
-                               DagRun.run_id.like(DagRun.ID_PREFIX + '%')))
             .all()
         )
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4edf9138/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py b/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py
deleted file mode 100644
index 29ffaf1..0000000
--- a/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Add state index for dagruns to allow the quick lookup of active dagruns
-
-Revision ID: 1a5a9e6bf2b5
-Revises: 5e7d17757c7a
-Create Date: 2017-01-17 10:22:53.193711
-
-"""
-
-# revision identifiers, used by Alembic.
-revision = '1a5a9e6bf2b5'
-down_revision = '5e7d17757c7a'
-branch_labels = None
-depends_on = None
-
-from alembic import op
-import sqlalchemy as sa
-
-
-def upgrade():
-    op.create_index('dr_state', 'dag_run', ['state'], unique=False)
-
-
-def downgrade():
-    op.drop_index('state', table_name='dag_run')


[4/6] incubator-airflow git commit: Merge branch 'fix_localtaskjob'

Posted by bo...@apache.org.
Merge branch 'fix_localtaskjob'


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5479ac8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5479ac8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5479ac8d

Branch: refs/heads/v1-8-test
Commit: 5479ac8d4ab9bbb4bab839df9f0d5cf0c0e80388
Parents: 4edf913 24d641b
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jan 25 21:36:28 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 21:36:28 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py | 27 ++++++++++++---------------
 1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5479ac8d/airflow/jobs.py
----------------------------------------------------------------------


[2/6] incubator-airflow git commit: [AIRFLOW-798] Check return_code before forcing termination

Posted by bo...@apache.org.
[AIRFLOW-798] Check return_code before forcing termination

LocalTaskJob could still log an error en self destruct,
although the underlying process already exited.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/24d641bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/24d641bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/24d641bc

Branch: refs/heads/v1-8-test
Commit: 24d641bc106c112f86771bd394d877dd4df578f9
Parents: a2b0ea3
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jan 24 12:01:44 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 24 16:40:22 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py | 27 ++++++++++++---------------
 1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/24d641bc/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8bb93bb..978fc35 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2081,28 +2081,25 @@ class LocalTaskJob(BaseJob):
             # task is already terminating, let it breathe
             return
 
-        # Suicide pill
-        TI = models.TaskInstance
+        self.task_instance.refresh_from_db()
         ti = self.task_instance
-        new_ti = session.query(TI).filter(
-            TI.dag_id == ti.dag_id, TI.task_id == ti.task_id,
-            TI.execution_date == ti.execution_date).scalar()
-        if new_ti.state == State.RUNNING:
+        if ti.state == State.RUNNING:
             self.was_running = True
             fqdn = socket.getfqdn()
-            if not (fqdn == new_ti.hostname and
-                    self.task_runner.process.pid == new_ti.pid):
-                logging.warning("Recorded hostname and pid of {new_ti.hostname} "
-                                "and {new_ti.pid} do not match this instance's "
+            if not (fqdn == ti.hostname and
+                    self.task_runner.process.pid == ti.pid):
+                logging.warning("Recorded hostname and pid of {ti.hostname} "
+                                "and {ti.pid} do not match this instance's "
                                 "which are {fqdn} and "
-                                "{self.task_runner.process.pid}. Taking the poison pill. "
-                                "So long."
+                                "{self.task_runner.process.pid}. "
+                                "Taking the poison pill. So long."
                                 .format(**locals()))
                 raise AirflowException("Another worker/process is running this job")
-        elif self.was_running and hasattr(self.task_runner, 'process'):
+        elif (self.was_running
+              and self.task_runner.return_code() is None
+              and hasattr(self.task_runner, 'process')):
             logging.warning(
                 "State of this instance has been externally set to "
-                "{self.task_instance.state}. "
-                "Taking the poison pill. So long.".format(**locals()))
+                "{}. Taking the poison pill. So long.".format(ti.state))
             self.task_runner.terminate()
             self.terminating = True