You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:25:20 UTC

[incubator-liminal] 40/43: fix jobEndStatus tasks state check

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git

commit fac89af6108d5d426ed415a997eab88d5ada761a
Author: zionrubin <zi...@naturalint.com>
AuthorDate: Tue Jun 23 15:16:55 2020 +0300

    fix jobEndStatus tasks state check
---
 .../airflow/operators/job_status_operator.py       | 38 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 5 deletions(-)

diff --git a/rainbow/runners/airflow/operators/job_status_operator.py b/rainbow/runners/airflow/operators/job_status_operator.py
index ae9382a..8ea997d 100644
--- a/rainbow/runners/airflow/operators/job_status_operator.py
+++ b/rainbow/runners/airflow/operators/job_status_operator.py
@@ -17,12 +17,16 @@
 # specific language governing permissions and limitations
 # under the License.
 from datetime import datetime
+from typing import Any
 
 import pytz
 from airflow.contrib.hooks.aws_hook import AwsHook
 from airflow.exceptions import AirflowException
+from airflow.lineage import apply_lineage
 from airflow.models import BaseOperator
+from airflow.utils.db import provide_session
 from airflow.utils.decorators import apply_defaults
+from airflow.utils.state import State
 
 
 class JobStatusOperator(BaseOperator):
@@ -94,6 +98,7 @@ class JobEndOperator(JobStatusOperator):
         super().__init__(backends=backends, *args, **kwargs)
         self.namespace = namespace
         self.application_name = application_name
+        self.__job_result = 0
 
     def metrics(self, context):
         duration = round((pytz.utc.localize(datetime.utcnow()) - context[
@@ -102,19 +107,40 @@ class JobEndOperator(JobStatusOperator):
         self.log.info('Elapsed time: %s' % duration)
 
         task_instances = context['dag_run'].get_task_instances()
-        task_states = [task_instance.state for task_instance in task_instances[:-1]]
 
-        job_result = 0
-        if all(state == 'success' for state in task_states):
-            job_result = 1
+        task_states = [self.__log_and_get_state(task_instance)
+                       for task_instance in task_instances
+                       if task_instance.task_id != context['task_instance'].task_id]
+
+        if all((state == State.SUCCESS or state == State.SKIPPED) for state in task_states):
+            self.__job_result = 1
 
         return [
-            Metric(self.namespace, 'JobResult', job_result,
+            Metric(self.namespace, 'JobResult', self.__job_result,
                    [Tag('ApplicationName', self.application_name)]),
             Metric(self.namespace, 'JobDuration', duration,
                    [Tag('ApplicationName', self.application_name)])
         ]
 
+    def __log_and_get_state(self, task_instance):
+        state = task_instance.state
+
+        self.log.info(f'{task_instance.task_id} finished with state: {state}')
+
+        return state
+
+    @apply_lineage
+    @provide_session
+    def post_execute(self, context: Any, result: Any = None, session=None):
+        if self.__job_result == 0:
+            self.log.info("Failing this DAG run due to task failure.")
+
+            dag_run = context['ti'].get_dagrun()
+            dag_run.end_date = datetime.utcnow()
+            dag_run.state = State.FAILED
+
+            session.merge(dag_run)
+
 
 # noinspection PyAbstractClass
 class CloudWatchHook(AwsHook):
@@ -150,6 +176,8 @@ class CloudWatchHook(AwsHook):
             ]
         )
 
+        self.log.info(f'Published metric: {metric.name} with value: {value}')
+
 
 class Metric:
     """