You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by jb...@apache.org on 2020/07/20 06:25:20 UTC
[incubator-liminal] 40/43: fix jobEndStatus tasks state check
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-liminal.git
commit fac89af6108d5d426ed415a997eab88d5ada761a
Author: zionrubin <zi...@naturalint.com>
AuthorDate: Tue Jun 23 15:16:55 2020 +0300
fix jobEndStatus tasks state check
---
.../airflow/operators/job_status_operator.py | 38 +++++++++++++++++++---
1 file changed, 33 insertions(+), 5 deletions(-)
diff --git a/rainbow/runners/airflow/operators/job_status_operator.py b/rainbow/runners/airflow/operators/job_status_operator.py
index ae9382a..8ea997d 100644
--- a/rainbow/runners/airflow/operators/job_status_operator.py
+++ b/rainbow/runners/airflow/operators/job_status_operator.py
@@ -17,12 +17,16 @@
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
+from typing import Any
import pytz
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.exceptions import AirflowException
+from airflow.lineage import apply_lineage
from airflow.models import BaseOperator
+from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
+from airflow.utils.state import State
class JobStatusOperator(BaseOperator):
@@ -94,6 +98,7 @@ class JobEndOperator(JobStatusOperator):
super().__init__(backends=backends, *args, **kwargs)
self.namespace = namespace
self.application_name = application_name
+ self.__job_result = 0
def metrics(self, context):
duration = round((pytz.utc.localize(datetime.utcnow()) - context[
@@ -102,19 +107,40 @@ class JobEndOperator(JobStatusOperator):
self.log.info('Elapsed time: %s' % duration)
task_instances = context['dag_run'].get_task_instances()
- task_states = [task_instance.state for task_instance in task_instances[:-1]]
- job_result = 0
- if all(state == 'success' for state in task_states):
- job_result = 1
+ task_states = [self.__log_and_get_state(task_instance)
+ for task_instance in task_instances
+ if task_instance.task_id != context['task_instance'].task_id]
+
+ if all((state == State.SUCCESS or state == State.SKIPPED) for state in task_states):
+ self.__job_result = 1
return [
- Metric(self.namespace, 'JobResult', job_result,
+ Metric(self.namespace, 'JobResult', self.__job_result,
[Tag('ApplicationName', self.application_name)]),
Metric(self.namespace, 'JobDuration', duration,
[Tag('ApplicationName', self.application_name)])
]
+ def __log_and_get_state(self, task_instance):
+ state = task_instance.state
+
+ self.log.info(f'{task_instance.task_id} finished with state: {state}')
+
+ return state
+
+ @apply_lineage
+ @provide_session
+ def post_execute(self, context: Any, result: Any = None, session=None):
+ if self.__job_result == 0:
+ self.log.info("Failing this DAG run due to task failure.")
+
+ dag_run = context['ti'].get_dagrun()
+ dag_run.end_date = datetime.utcnow()
+ dag_run.state = State.FAILED
+
+ session.merge(dag_run)
+
# noinspection PyAbstractClass
class CloudWatchHook(AwsHook):
@@ -150,6 +176,8 @@ class CloudWatchHook(AwsHook):
]
)
+ self.log.info(f'Published metric: {metric.name} with value: {value}')
+
class Metric:
"""