You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ma...@apache.org on 2016/06/17 21:31:08 UTC
incubator-airflow git commit: [AIRFLOW-234] make task that aren't
`running` self-terminate
Repository: incubator-airflow
Updated Branches:
refs/heads/master d243c003b -> 7c0f8373f
[AIRFLOW-234] make task that aren't `running` self-terminate
Closes #1585 from mistercrunch/undeads
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c0f8373
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c0f8373
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c0f8373
Branch: refs/heads/master
Commit: 7c0f8373f59b0554d5ba15bb0e5e8669f0830313
Parents: d243c00
Author: Maxime Beauchemin <ma...@apache.org>
Authored: Fri Jun 17 14:30:55 2016 -0700
Committer: Maxime Beauchemin <ma...@gmail.com>
Committed: Fri Jun 17 14:30:55 2016 -0700
----------------------------------------------------------------------
airflow/example_dags/docker_copy_data.py | 13 ++++++
airflow/example_dags/example_bash_operator.py | 13 ++++++
airflow/example_dags/example_branch_operator.py | 13 ++++++
airflow/example_dags/example_docker_operator.py | 13 ++++++
airflow/example_dags/example_http_operator.py | 13 ++++++
airflow/example_dags/example_python_operator.py | 13 ++++++
.../example_short_circuit_operator.py | 13 ++++++
airflow/example_dags/example_subdag_operator.py | 13 ++++++
.../example_trigger_controller_dag.py | 14 +++++-
.../example_dags/example_trigger_target_dag.py | 13 ++++++
airflow/example_dags/example_xcom.py | 13 ++++++
airflow/example_dags/test_utils.py | 29 ++++++++++++
airflow/jobs.py | 46 ++++++++++++--------
airflow/models.py | 27 +++++++++---
tests/core.py | 42 +++++++++++++++++-
15 files changed, 263 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/docker_copy_data.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py
index ccf84c1..f0789b1 100644
--- a/airflow/example_dags/docker_copy_data.py
+++ b/airflow/example_dags/docker_copy_data.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
'''
This sample "listen to directory". move the new file and print it, using docker-containers.
The following operators are being used: DockerOperator, BashOperator & ShortCircuitOperator.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index 4ab9144..c759f4d 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from builtins import range
from airflow.operators import BashOperator, DummyOperator
from airflow.models import DAG
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_branch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index f576d20..edd177a 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from airflow.operators import BranchPythonOperator, DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_docker_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_docker_operator.py b/airflow/example_dags/example_docker_operator.py
index e014fe5..6bb71ff 100644
--- a/airflow/example_dags/example_docker_operator.py
+++ b/airflow/example_dags/example_docker_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""
from airflow import DAG
from airflow.operators import BashOperator
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index 4501825..41ea385 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""
### Example HTTP operator and sensor
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index 0fc2180..a2f8abd 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_short_circuit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 967c65e..907cf51 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from airflow.operators import ShortCircuitOperator, DummyOperator
from airflow.models import DAG
import airflow.utils.helpers
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index 120f333..57a62c6 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from datetime import datetime
from airflow.models import DAG
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_trigger_controller_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index 4b66ad1..b754d64 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -1,4 +1,16 @@
-
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_trigger_target_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 172003f..41a3e36 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from airflow.operators import BashOperator, PythonOperator
from airflow.models import DAG
from datetime import datetime
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_xcom.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 2d9c087..71cd44e 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from __future__ import print_function
import airflow
from datetime import datetime, timedelta
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/test_utils.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
new file mode 100644
index 0000000..38e50d0
--- /dev/null
+++ b/airflow/example_dags/test_utils.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Used for unit tests"""
+from airflow.operators import BashOperator
+from airflow.models import DAG
+from datetime import datetime
+
+dag = DAG(
+ dag_id='test_utils',
+ schedule_interval=None,
+)
+
+task = BashOperator(
+ task_id='sleeps_forever',
+ dag=dag,
+ bash_command="sleep 10000000000",
+ start_date=datetime(2016, 1, 1),
+ owner='airflow')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 1e583ac..0713bbe 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -19,7 +19,7 @@ from __future__ import unicode_literals
from past.builtins import basestring
from collections import defaultdict, Counter
-from datetime import datetime
+from datetime import datetime, timedelta
import getpass
import logging
import socket
@@ -116,7 +116,7 @@ class BaseJob(Base, LoggingMixin):
'''
pass
- def heartbeat_callback(self):
+ def heartbeat_callback(self, session=None):
pass
def heartbeat(self):
@@ -139,7 +139,7 @@ class BaseJob(Base, LoggingMixin):
sleep at all.
'''
session = settings.Session()
- job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
+ job = session.query(BaseJob).filter_by(id=self.id).one()
if job.state == State.SHUTDOWN:
self.kill()
@@ -154,9 +154,9 @@ class BaseJob(Base, LoggingMixin):
session.merge(job)
session.commit()
- session.close()
- self.heartbeat_callback()
+ self.heartbeat_callback(session=session)
+ session.close()
self.logger.debug('[heart] Boom.')
def run(self):
@@ -378,7 +378,8 @@ class SchedulerJob(BaseJob):
filename=filename, stacktrace=stacktrace))
session.commit()
- def schedule_dag(self, dag):
+ @provide_session
+ def schedule_dag(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
@@ -386,7 +387,6 @@ class SchedulerJob(BaseJob):
"""
if dag.schedule_interval:
DagRun = models.DagRun
- session = settings.Session()
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
@@ -799,10 +799,10 @@ class SchedulerJob(BaseJob):
finally:
settings.Session.remove()
executor.end()
-
session.close()
- def heartbeat_callback(self):
+ @provide_session
+ def heartbeat_callback(self, session=None):
Stats.gauge('scheduler_heartbeat', 1, 1)
@@ -1093,6 +1093,15 @@ class LocalTaskJob(BaseJob):
self.pool = pool
self.pickle_id = pickle_id
self.mark_success = mark_success
+
+ # terminating state is used so that a job don't try to
+ # terminate multiple times
+ self.terminating = False
+
+ # Keeps track of the fact that the task instance has been observed
+ # as running at least once
+ self.was_running = False
+
super(LocalTaskJob, self).__init__(*args, **kwargs)
def _execute(self):
@@ -1115,23 +1124,26 @@ class LocalTaskJob(BaseJob):
def on_kill(self):
self.process.terminate()
- """
- def heartbeat_callback(self):
- if datetime.now() - self.start_date < timedelta(seconds=300):
+ @provide_session
+ def heartbeat_callback(self, session=None):
+ """Self destruct task if state has been moved away from running externally"""
+
+ if self.terminating:
+ # task is already terminating, let it breathe
return
+
# Suicide pill
TI = models.TaskInstance
ti = self.task_instance
- session = settings.Session()
state = session.query(TI.state).filter(
TI.dag_id==ti.dag_id, TI.task_id==ti.task_id,
TI.execution_date==ti.execution_date).scalar()
- session.commit()
- session.close()
- if state != State.RUNNING:
+ if state == State.RUNNING:
+ self.was_running = True
+ elif self.was_running and hasattr(self, 'process'):
logging.warning(
"State of this instance has been externally set to "
"{self.task_instance.state}. "
"Taking the poison pill. So long.".format(**locals()))
self.process.terminate()
- """
+ self.terminating = True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 38359f7..09d880e 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1179,17 +1179,32 @@ class TaskInstance(Base):
def run(
self,
verbose=True,
- ignore_dependencies=False, # Doesn't check for deps, just runs
- ignore_depends_on_past=False, # Ignore depends_on_past but respect
- # other deps
- force=False, # Disregards previous successes
- mark_success=False, # Don't run the task, act as if it succeeded
- test_mode=False, # Doesn't record success or failure in the DB
+ ignore_dependencies=False,
+ ignore_depends_on_past=False,
+ force=False,
+ mark_success=False,
+ test_mode=False,
job_id=None,
pool=None,
session=None):
"""
Runs the task instance.
+
+ :param verbose: whether to turn on more verbose loggin
+ :type verbose: boolean
+ :param ignore_dependencies: Doesn't check for deps, just runs
+ :type ignore_dependencies: boolean
+ :param ignore_depends_on_past: Ignore depends_on_past but respect
+ other dependencies
+ :type ignore_depends_on_past: boolean
+ :param force: Forces a run regarless of previous success
+ :type force: boolean
+ :param mark_success: Don't run the task, mark its state as success
+ :type mark_success: boolean
+ :param test_mode: Doesn't record success or failure in the DB
+ :type test_mode: boolean
+ :param pool: specifies the pool to use to run the task instance
+ :type pool: str
"""
task = self.task
self.pool = pool or task.pool
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 5e6a4fd..2ab14ea 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -19,6 +19,7 @@ import json
import os
import re
import unittest
+import multiprocessing
import mock
import tempfile
from datetime import datetime, time, timedelta
@@ -49,7 +50,7 @@ from airflow.configuration import AirflowConfigException
import six
-NUM_EXAMPLE_DAGS = 15
+NUM_EXAMPLE_DAGS = 16
DEV_NULL = '/dev/null'
TEST_DAG_FOLDER = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'dags')
@@ -629,6 +630,45 @@ class CoreTest(unittest.TestCase):
trigger_rule="non_existant",
dag=self.dag)
+ def test_terminate_task(self):
+ """If a task instance's db state get deleted, it should fail"""
+ TI = models.TaskInstance
+ dag = self.dagbag.dags.get('test_utils')
+ task = dag.task_dict.get('sleeps_forever')
+
+ ti = TI(task=task, execution_date=DEFAULT_DATE)
+ job = jobs.LocalTaskJob(
+ task_instance=ti, force=True, executor=SequentialExecutor())
+
+ # Running task instance asynchronously
+ p = multiprocessing.Process(target=job.run)
+ p.start()
+ sleep(5)
+ settings.engine.dispose()
+ session = settings.Session()
+ ti.refresh_from_db(session=session)
+ # making sure it's actually running
+ assert State.RUNNING == ti.state
+ ti = (
+ session.query(TI)
+ .filter_by(
+ dag_id=task.dag_id,
+ task_id=task.task_id,
+ execution_date=DEFAULT_DATE)
+ .one()
+ )
+ # deleting the instance should result in a failure
+ session.delete(ti)
+ session.commit()
+ # waiting for the async task to finish
+ p.join()
+
+ # making sure that the task ended up as failed
+ ti.refresh_from_db(session=session)
+ assert State.FAILED == ti.state
+ session.close()
+
+
class CliTests(unittest.TestCase):
def setUp(self):
configuration.test_mode()