You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/20 23:07:05 UTC

[1/4] incubator-airflow git commit: [AIRFLOW-778] Fix completey broken MetastorePartitionSensor

Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 38b442da1 -> ecbe3ce6c


[AIRFLOW-778] Fix completey broken MetastorePartitionSensor

MetastorePartitionSensor always throws an
exception on initialization due to
72cc8b3006576153aa30d27643807b4ae5dfb593 . This PR
reverts the breaking part of this commit.
Looks like the tests for this are only run if an
explicit flag is set which is how this got past
CI.

Closes #2005 from
aoen/ddavydov/fix_metastore_partition_sensor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/241fd270
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/241fd270
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/241fd270

Branch: refs/heads/v1-8-test
Commit: 241fd2709fa948336018cfa1032bcea2c8b8d1bd
Parents: 08e28ab
Author: Dan Davydov <da...@airbnb.com>
Authored: Fri Jan 20 14:28:06 2017 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri Jan 20 14:28:24 2017 -0800

----------------------------------------------------------------------
 airflow/operators/sensors.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/241fd270/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index f5dd148..5fbd21c 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -152,7 +152,12 @@ class MetastorePartitionSensor(SqlSensor):
         self.schema = schema
         self.first_poke = True
         self.conn_id = mysql_conn_id
-        super(MetastorePartitionSensor, self).__init__(*args, **kwargs)
+        # TODO(aoen): We shouldn't be using SqlSensor here but MetastorePartitionSensor.
+        # The problem is the way apply_defaults works isn't compatible with inheritance.
+        # The inheritance model needs to be reworked in order to support overriding args/
+        # kwargs with arguments here, then 'conn_id' and 'sql' can be passed into the
+        # constructor below and apply_defaults will no longer throw an exception.
+        super(SqlSensor, self).__init__(*args, **kwargs)
 
     def poke(self, context):
         if self.first_poke:


[2/4] incubator-airflow git commit: [AIRFLOW-779] Task should fail with specific message when deleted

Posted by bo...@apache.org.
[AIRFLOW-779] Task should fail with specific message when deleted

Testing Done:
- Killed a task while it was running using the
task instances UI, verified behavior is the same
as before, and logging worked

Closes #2006 from saguziel/aguziel-terminate-
nonexistent


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92215875
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92215875
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92215875

Branch: refs/heads/v1-8-test
Commit: 9221587514e2a0155cdced2d3ae50129b0793a10
Parents: 241fd27
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Jan 20 14:32:29 2017 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri Jan 20 14:32:31 2017 -0800

----------------------------------------------------------------------
 airflow/jobs.py | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92215875/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f1de333..350c6d4 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2016,10 +2016,6 @@ class LocalTaskJob(BaseJob):
         self.pickle_id = pickle_id
         self.mark_success = mark_success
 
-        # terminating state is used so that a job don't try to
-        # terminate multiple times
-        self.terminating = False
-
         # Keeps track of the fact that the task instance has been observed
         # as running at least once
         self.was_running = False
@@ -2083,17 +2079,16 @@ class LocalTaskJob(BaseJob):
     def heartbeat_callback(self, session=None):
         """Self destruct task if state has been moved away from running externally"""
 
-        if self.terminating:
-            # task is already terminating, let it breathe
-            return
-
         # Suicide pill
         TI = models.TaskInstance
         ti = self.task_instance
         new_ti = session.query(TI).filter(
             TI.dag_id == ti.dag_id, TI.task_id == ti.task_id,
             TI.execution_date == ti.execution_date).scalar()
-        if new_ti.state == State.RUNNING:
+        if new_ti is None:
+            logging.warning("Task instance does not exist in DB. Terminating")
+            raise AirflowException("Task instance does not exist in DB")
+        elif new_ti.state == State.RUNNING:
             self.was_running = True
             fqdn = socket.getfqdn()
             if not (fqdn == new_ti.hostname and
@@ -2110,5 +2105,4 @@ class LocalTaskJob(BaseJob):
                 "State of this instance has been externally set to "
                 "{self.task_instance.state}. "
                 "Taking the poison pill. So long.".format(**locals()))
-            self.task_runner.terminate()
-            self.terminating = True
+            raise AirflowException("Task instance state has been changed externally")


[4/4] incubator-airflow git commit: Merge branch 'master' into v1-8-test

Posted by bo...@apache.org.
Merge branch 'master' into v1-8-test


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ecbe3ce6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ecbe3ce6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ecbe3ce6

Branch: refs/heads/v1-8-test
Commit: ecbe3ce6c5a13aa263c475bc080e5e77484035aa
Parents: 38b442d 927f30c
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Jan 21 00:06:55 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Jan 21 00:06:55 2017 +0100

----------------------------------------------------------------------
 airflow/operators/sensors.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[3/4] incubator-airflow git commit: Revert "[AIRFLOW-779] Task should fail with specific message when deleted"

Posted by bo...@apache.org.
Revert "[AIRFLOW-779] Task should fail with specific message when deleted"

This reverts commit 9221587514e2a0155cdced2d3ae50129b0793a10.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/927f30c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/927f30c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/927f30c9

Branch: refs/heads/v1-8-test
Commit: 927f30c9b15bc3ccd7ea8aa53abf4c9d82b3bef5
Parents: 9221587
Author: Dan Davydov <da...@airbnb.com>
Authored: Fri Jan 20 14:49:53 2017 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri Jan 20 14:49:53 2017 -0800

----------------------------------------------------------------------
 airflow/jobs.py | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/927f30c9/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 350c6d4..f1de333 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2016,6 +2016,10 @@ class LocalTaskJob(BaseJob):
         self.pickle_id = pickle_id
         self.mark_success = mark_success
 
+        # terminating state is used so that a job don't try to
+        # terminate multiple times
+        self.terminating = False
+
         # Keeps track of the fact that the task instance has been observed
         # as running at least once
         self.was_running = False
@@ -2079,16 +2083,17 @@ class LocalTaskJob(BaseJob):
     def heartbeat_callback(self, session=None):
         """Self destruct task if state has been moved away from running externally"""
 
+        if self.terminating:
+            # task is already terminating, let it breathe
+            return
+
         # Suicide pill
         TI = models.TaskInstance
         ti = self.task_instance
         new_ti = session.query(TI).filter(
             TI.dag_id == ti.dag_id, TI.task_id == ti.task_id,
             TI.execution_date == ti.execution_date).scalar()
-        if new_ti is None:
-            logging.warning("Task instance does not exist in DB. Terminating")
-            raise AirflowException("Task instance does not exist in DB")
-        elif new_ti.state == State.RUNNING:
+        if new_ti.state == State.RUNNING:
             self.was_running = True
             fqdn = socket.getfqdn()
             if not (fqdn == new_ti.hostname and
@@ -2105,4 +2110,5 @@ class LocalTaskJob(BaseJob):
                 "State of this instance has been externally set to "
                 "{self.task_instance.state}. "
                 "Taking the poison pill. So long.".format(**locals()))
-            raise AirflowException("Task instance state has been changed externally")
+            self.task_runner.terminate()
+            self.terminating = True