You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/25 20:36:50 UTC
[1/3] incubator-airflow git commit: [AIRFLOW-798] Check return_code
before forcing termination
Repository: incubator-airflow
Updated Branches:
refs/heads/master ac9167f37 -> 5479ac8d4
[AIRFLOW-798] Check return_code before forcing termination
LocalTaskJob could still log an error en self destruct,
although the underlying process already exited.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/24d641bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/24d641bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/24d641bc
Branch: refs/heads/master
Commit: 24d641bc106c112f86771bd394d877dd4df578f9
Parents: a2b0ea3
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jan 24 12:01:44 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 24 16:40:22 2017 +0100
----------------------------------------------------------------------
airflow/jobs.py | 27 ++++++++++++---------------
1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/24d641bc/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8bb93bb..978fc35 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2081,28 +2081,25 @@ class LocalTaskJob(BaseJob):
# task is already terminating, let it breathe
return
- # Suicide pill
- TI = models.TaskInstance
+ self.task_instance.refresh_from_db()
ti = self.task_instance
- new_ti = session.query(TI).filter(
- TI.dag_id == ti.dag_id, TI.task_id == ti.task_id,
- TI.execution_date == ti.execution_date).scalar()
- if new_ti.state == State.RUNNING:
+ if ti.state == State.RUNNING:
self.was_running = True
fqdn = socket.getfqdn()
- if not (fqdn == new_ti.hostname and
- self.task_runner.process.pid == new_ti.pid):
- logging.warning("Recorded hostname and pid of {new_ti.hostname} "
- "and {new_ti.pid} do not match this instance's "
+ if not (fqdn == ti.hostname and
+ self.task_runner.process.pid == ti.pid):
+ logging.warning("Recorded hostname and pid of {ti.hostname} "
+ "and {ti.pid} do not match this instance's "
"which are {fqdn} and "
- "{self.task_runner.process.pid}. Taking the poison pill. "
- "So long."
+ "{self.task_runner.process.pid}. "
+ "Taking the poison pill. So long."
.format(**locals()))
raise AirflowException("Another worker/process is running this job")
- elif self.was_running and hasattr(self.task_runner, 'process'):
+ elif (self.was_running
+ and self.task_runner.return_code() is None
+ and hasattr(self.task_runner, 'process')):
logging.warning(
"State of this instance has been externally set to "
- "{self.task_instance.state}. "
- "Taking the poison pill. So long.".format(**locals()))
+ "{}. Taking the poison pill. So long.".format(ti.state))
self.task_runner.terminate()
self.terminating = True
[2/3] incubator-airflow git commit: [AIRFLOW-803] Revert join with
dag_runs in _execute_task_instances
Posted by bo...@apache.org.
[AIRFLOW-803] Revert join with dag_runs in _execute_task_instances
TaskInstances will be set to 'scheduled' if they meet the criteria to run,
also the ones up for retry. No task_instance will be send to the executor
in another state than 'scheduled'.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4edf9138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4edf9138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4edf9138
Branch: refs/heads/master
Commit: 4edf9138d03fa4cbce5a1fc9059735d6f80f80f2
Parents: ac9167f
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jan 25 11:54:30 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 13:29:14 2017 +0100
----------------------------------------------------------------------
airflow/jobs.py | 6 +---
.../1a5a9e6bf2b5_add_state_index_for_dagruns.py | 37 --------------------
2 files changed, 1 insertion(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4edf9138/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8bb93bb..3e3229f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -34,7 +34,7 @@ import time
from time import sleep
import psutil
-from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_
+from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from tabulate import tabulate
@@ -955,10 +955,6 @@ class SchedulerJob(BaseJob):
.query(TI)
.filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
.filter(TI.state.in_(states))
- .join(DagRun, and_(TI.dag_id == DagRun.dag_id,
- TI.execution_date == DagRun.execution_date,
- DagRun.state == State.RUNNING,
- DagRun.run_id.like(DagRun.ID_PREFIX + '%')))
.all()
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4edf9138/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py b/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py
deleted file mode 100644
index 29ffaf1..0000000
--- a/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Add state index for dagruns to allow the quick lookup of active dagruns
-
-Revision ID: 1a5a9e6bf2b5
-Revises: 5e7d17757c7a
-Create Date: 2017-01-17 10:22:53.193711
-
-"""
-
-# revision identifiers, used by Alembic.
-revision = '1a5a9e6bf2b5'
-down_revision = '5e7d17757c7a'
-branch_labels = None
-depends_on = None
-
-from alembic import op
-import sqlalchemy as sa
-
-
-def upgrade():
- op.create_index('dr_state', 'dag_run', ['state'], unique=False)
-
-
-def downgrade():
- op.drop_index('state', table_name='dag_run')
[3/3] incubator-airflow git commit: Merge branch 'fix_localtaskjob'
Posted by bo...@apache.org.
Merge branch 'fix_localtaskjob'
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5479ac8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5479ac8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5479ac8d
Branch: refs/heads/master
Commit: 5479ac8d4ab9bbb4bab839df9f0d5cf0c0e80388
Parents: 4edf913 24d641b
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Jan 25 21:36:28 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jan 25 21:36:28 2017 +0100
----------------------------------------------------------------------
airflow/jobs.py | 27 ++++++++++++---------------
1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5479ac8d/airflow/jobs.py
----------------------------------------------------------------------