You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/25 22:00:06 UTC
[1/6] incubator-airflow git commit: [AIRFLOW-139] Let psycopg2 handle
autocommit for PostgresHook
Repository: incubator-airflow
Updated Branches:
refs/heads/v1-8-test 1ea4c533f -> 76cbfeb3a
[AIRFLOW-139] Let psycopg2 handle autocommit for PostgresHook
The server-side autocommit setting was removed and reimplemented
in client applications and languages. Server-side autocommit was
causing too many problems with languages and applications that
wanted to control their own autocommit behavior,
so autocommit was removed from the server and added to individual client APIs as appropriate
Closes #1821 from danielzohar/AIRFLOW-
139_vacuum_operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ac9167f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ac9167f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ac9167f3
Branch: refs/heads/v1-8-test
Commit: ac9167f37b586f9ece381763b91a0ee25d736f38
Parents: a2b0ea3
Author: Daniel Zohar <i...@danielzohar.com>
Authored: Tue Jan 24 15:45:39 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 24 15:45:48 2017 +0100
----------------------------------------------------------------------
airflow/hooks/postgres_hook.py | 4 +---
tests/operators/operators.py | 24 ++++++++++++++++++------
2 files changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9167f3/airflow/hooks/postgres_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py
index d096d75..75c8226 100644
--- a/airflow/hooks/postgres_hook.py
+++ b/airflow/hooks/postgres_hook.py
@@ -26,7 +26,7 @@ class PostgresHook(DbApiHook):
'''
conn_name_attr = 'postgres_conn_id'
default_conn_name = 'postgres_default'
- supports_autocommit = False
+ supports_autocommit = True
def get_conn(self):
conn = self.get_connection(self.postgres_conn_id)
@@ -41,8 +41,6 @@ class PostgresHook(DbApiHook):
if arg_name in ['sslmode', 'sslcert', 'sslkey', 'sslrootcert', 'sslcrl', 'application_name']:
conn_args[arg_name] = arg_val
psycopg2_conn = psycopg2.connect(**conn_args)
- if psycopg2_conn.server_version < 70400:
- self.supports_autocommit = True
return psycopg2_conn
@staticmethod
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9167f3/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 7458827..7aaf12e 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -15,18 +15,14 @@
from __future__ import print_function
import datetime
-import os
-import unittest
-import six
-from airflow import DAG, configuration, operators, utils
+from airflow import DAG, configuration, operators
from airflow.utils.tests import skipUnlessImported
+
configuration.load_test_config()
-import os
import unittest
-
DEFAULT_DATE = datetime.datetime(2015, 1, 1)
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
@@ -118,6 +114,7 @@ class MySqlTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
@skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
class PostgresTest(unittest.TestCase):
def setUp(self):
@@ -182,6 +179,21 @@ class PostgresTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ def test_vacuum(self):
+ """
+ Verifies the VACUUM operation runs well with the PostgresOperator
+ """
+ import airflow.operators.postgres_operator
+
+ sql = "VACUUM ANALYZE;"
+ t = operators.postgres_operator.PostgresOperator(
+ task_id='postgres_operator_test_vacuum',
+ sql=sql,
+ dag=self.dag,
+ autocommit=True)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+
@skipUnlessImported('airflow.operators.hive_operator', 'HiveOperator')
@skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
class TransferTests(unittest.TestCase):
[5/6] incubator-airflow git commit: [AIRFLOW-807] Improve scheduler
performance for large DAGs
Posted by bo...@apache.org.
[AIRFLOW-807] Improve scheduler performance for large DAGs
MySQL's query optimizer selects the wrong index, this
has a significant impact on the performance of the
scheduler.
Closes #2021 from criccomini/AIRFLOW-807
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6b2a3ca2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6b2a3ca2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6b2a3ca2
Branch: refs/heads/v1-8-test
Commit: 6b2a3ca2ee4ee3415ef72ea1fa3fc694350e9efc
Parents: 5479ac8
Author: Chris Riccomini <ch...@wepay.com>
Authored: Wed Jan 25 22:54:09 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 22:54:18 2017 +0100
----------------------------------------------------------------------
airflow/jobs.py | 6 ++++
...7_add_dag_id_state_index_on_dag_run_table.py | 37 ++++++++++++++++++++
2 files changed, 43 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0ac3607..201d87f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -538,12 +538,18 @@ class SchedulerJob(BaseJob):
Where assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
"""
+ if not any([ti.sla for ti in dag.tasks]):
+ self.logger.info("Skipping SLA check for {} because "
+ "no tasks in DAG have SLAs".format(dag))
+ return
+
TI = models.TaskInstance
sq = (
session
.query(
TI.task_id,
func.max(TI.execution_date).label('max_ti'))
+ .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
.filter(TI.dag_id == dag.dag_id)
.filter(TI.state == State.SUCCESS)
.filter(TI.task_id.in_(dag.task_ids))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
new file mode 100644
index 0000000..c500966
--- /dev/null
+++ b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py
@@ -0,0 +1,37 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add dag_id/state index on dag_run table
+
+Revision ID: 127d2bf2dfa7
+Revises: 1a5a9e6bf2b5
+Create Date: 2017-01-25 11:43:51.635667
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '127d2bf2dfa7'
+down_revision = '1a5a9e6bf2b5'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+def upgrade():
+ op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False)
+
+
+def downgrade():
+ op.drop_index('dag_id_state', table_name='dag_run')
+
[6/6] incubator-airflow git commit: Merge branch 'master' into
v1-8-test
Posted by bo...@apache.org.
Merge branch 'master' into v1-8-test
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/76cbfeb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/76cbfeb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/76cbfeb3
Branch: refs/heads/v1-8-test
Commit: 76cbfeb3af05d2eaf2378430a0092454fbb07e03
Parents: 1ea4c53 6b2a3ca
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jan 25 22:59:50 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 22:59:50 2017 +0100
----------------------------------------------------------------------
airflow/hooks/postgres_hook.py | 4 +-
airflow/jobs.py | 39 ++++++++++----------
...7_add_dag_id_state_index_on_dag_run_table.py | 37 +++++++++++++++++++
.../1a5a9e6bf2b5_add_state_index_for_dagruns.py | 37 -------------------
tests/operators/operators.py | 24 +++++++++---
5 files changed, 75 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
[3/6] incubator-airflow git commit: [AIRFLOW-803] Revert join with
dag_runs in _execute_task_instances
Posted by bo...@apache.org.
[AIRFLOW-803] Revert join with dag_runs in _execute_task_instances
TaskInstances will be set to 'scheduled' if they meet the criteria to run,
also the ones up for retry. No task_instance will be send to the executor
in another state than 'scheduled'.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4edf9138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4edf9138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4edf9138
Branch: refs/heads/v1-8-test
Commit: 4edf9138d03fa4cbce5a1fc9059735d6f80f80f2
Parents: ac9167f
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jan 25 11:54:30 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 13:29:14 2017 +0100
----------------------------------------------------------------------
airflow/jobs.py | 6 +---
.../1a5a9e6bf2b5_add_state_index_for_dagruns.py | 37 --------------------
2 files changed, 1 insertion(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4edf9138/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8bb93bb..3e3229f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -34,7 +34,7 @@ import time
from time import sleep
import psutil
-from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_
+from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from tabulate import tabulate
@@ -955,10 +955,6 @@ class SchedulerJob(BaseJob):
.query(TI)
.filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
.filter(TI.state.in_(states))
- .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
- TI.execution_date == DagRun.execution_date,
- DagRun.state == State.RUNNING,
- DagRun.run_id.like(DagRun.ID_PREFIX + '%')))
.all()
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4edf9138/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py b/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py
deleted file mode 100644
index 29ffaf1..0000000
--- a/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Add state index for dagruns to allow the quick lookup of active dagruns
-
-Revision ID: 1a5a9e6bf2b5
-Revises: 5e7d17757c7a
-Create Date: 2017-01-17 10:22:53.193711
-
-"""
-
-# revision identifiers, used by Alembic.
-revision = '1a5a9e6bf2b5'
-down_revision = '5e7d17757c7a'
-branch_labels = None
-depends_on = None
-
-from alembic import op
-import sqlalchemy as sa
-
-
-def upgrade():
- op.create_index('dr_state', 'dag_run', ['state'], unique=False)
-
-
-def downgrade():
- op.drop_index('state', table_name='dag_run')
[4/6] incubator-airflow git commit: Merge branch 'fix_localtaskjob'
Posted by bo...@apache.org.
Merge branch 'fix_localtaskjob'
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5479ac8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5479ac8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5479ac8d
Branch: refs/heads/v1-8-test
Commit: 5479ac8d4ab9bbb4bab839df9f0d5cf0c0e80388
Parents: 4edf913 24d641b
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jan 25 21:36:28 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 21:36:28 2017 +0100
----------------------------------------------------------------------
airflow/jobs.py | 27 ++++++++++++---------------
1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5479ac8d/airflow/jobs.py
----------------------------------------------------------------------
[2/6] incubator-airflow git commit: [AIRFLOW-798] Check return_code
before forcing termination
Posted by bo...@apache.org.
[AIRFLOW-798] Check return_code before forcing termination
LocalTaskJob could still log an error en self destruct,
although the underlying process already exited.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/24d641bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/24d641bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/24d641bc
Branch: refs/heads/v1-8-test
Commit: 24d641bc106c112f86771bd394d877dd4df578f9
Parents: a2b0ea3
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jan 24 12:01:44 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 24 16:40:22 2017 +0100
----------------------------------------------------------------------
airflow/jobs.py | 27 ++++++++++++---------------
1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/24d641bc/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8bb93bb..978fc35 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2081,28 +2081,25 @@ class LocalTaskJob(BaseJob):
# task is already terminating, let it breathe
return
- # Suicide pill
- TI = models.TaskInstance
+ self.task_instance.refresh_from_db()
ti = self.task_instance
- new_ti = session.query(TI).filter(
- TI.dag_id == ti.dag_id, TI.task_id == ti.task_id,
- TI.execution_date == ti.execution_date).scalar()
- if new_ti.state == State.RUNNING:
+ if ti.state == State.RUNNING:
self.was_running = True
fqdn = socket.getfqdn()
- if not (fqdn == new_ti.hostname and
- self.task_runner.process.pid == new_ti.pid):
- logging.warning("Recorded hostname and pid of {new_ti.hostname} "
- "and {new_ti.pid} do not match this instance's "
+ if not (fqdn == ti.hostname and
+ self.task_runner.process.pid == ti.pid):
+ logging.warning("Recorded hostname and pid of {ti.hostname} "
+ "and {ti.pid} do not match this instance's "
"which are {fqdn} and "
- "{self.task_runner.process.pid}. Taking the poison pill. "
- "So long."
+ "{self.task_runner.process.pid}. "
+ "Taking the poison pill. So long."
.format(**locals()))
raise AirflowException("Another worker/process is running this job")
- elif self.was_running and hasattr(self.task_runner, 'process'):
+ elif (self.was_running
+ and self.task_runner.return_code() is None
+ and hasattr(self.task_runner, 'process')):
logging.warning(
"State of this instance has been externally set to "
- "{self.task_instance.state}. "
- "Taking the poison pill. So long.".format(**locals()))
+ "{}. Taking the poison pill. So long.".format(ti.state))
self.task_runner.terminate()
self.terminating = True