You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/22 02:22:00 UTC

[jira] [Commented] (AIRFLOW-3153) send dag last_run to statsd

    [ https://issues.apache.org/jira/browse/AIRFLOW-3153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16727178#comment-16727178 ] 

ASF GitHub Bot commented on AIRFLOW-3153:
-----------------------------------------

stale[bot] closed pull request #3997: [AIRFLOW-3153] send dag last_run to statsd
URL: https://github.com/apache/incubator-airflow/pull/3997
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/jobs.py b/airflow/jobs.py
index da1089d690..94ec4458d8 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -580,7 +580,8 @@ def __init__(
         self.using_sqlite = False
         if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
             if self.max_threads > 1:
-                self.log.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1")
+                self.log.error("Cannot use more than 1 thread when using sqlite. "
+                               "Setting max_threads to 1")
             self.max_threads = 1
             self.using_sqlite = True
 
@@ -1026,7 +1027,8 @@ def _change_state_for_tis_without_dagrun(self,
 
         if tis_changed > 0:
             self.log.warning(
-                "Set %s task instances to state=%s as their associated DagRun was not in RUNNING state",
+                "Set %s task instances to state=%s "
+                "as their associated DagRun was not in RUNNING state",
                 tis_changed, new_state
             )
 
@@ -1201,7 +1203,8 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
                                       " this task has been reached.", task_instance)
                         continue
                     else:
-                        task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
+                        task_concurrency_map[(task_instance.dag_id,
+                                              task_instance.task_id)] += 1
 
                 if self.executor.has_task(task_instance):
                     self.log.debug(
@@ -1505,6 +1508,8 @@ def _log_file_processing_stats(self,
                    "Last Run"]
 
         rows = []
+        dags_folder = conf.get('core', 'dags_folder').rstrip(os.sep)
+
         for file_path in known_file_paths:
             last_runtime = processor_manager.get_last_runtime(file_path)
             processor_pid = processor_manager.get_pid(file_path)
@@ -1513,6 +1518,16 @@ def _log_file_processing_stats(self,
                        if processor_start_time else None)
             last_run = processor_manager.get_last_finish_time(file_path)
 
+            file_name = file_path[len(dags_folder) + 1:]
+            dag_name = os.path.splitext(file_name)[0].replace(os.sep, '.')
+            if last_runtime is not None:
+                Stats.gauge('last_runtime.{}'.format(dag_name), last_runtime)
+            if last_run is not None:
+                unixtime = last_run.strftime("%s")
+                seconds_ago = (timezone.utcnow() - last_run).total_seconds()
+                Stats.gauge('last_run.unixtime.{}'.format(dag_name), unixtime)
+                Stats.gauge('last_run.seconds_ago.{}'.format(dag_name), seconds_ago)
+
             rows.append((file_path,
                          processor_pid,
                          runtime,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> send dag last_run to statsd
> ---------------------------
>
>                 Key: AIRFLOW-3153
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3153
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Tao Feng
>            Assignee: Tao Feng
>            Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)