You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/06 16:43:00 UTC

[jira] [Commented] (AIRFLOW-2811) Fix scheduler_ops_metrics.py to work

    [ https://issues.apache.org/jira/browse/AIRFLOW-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570456#comment-16570456 ] 

ASF GitHub Bot commented on AIRFLOW-2811:
-----------------------------------------

feng-tao closed pull request #3653: [AIRFLOW-2811] Fix scheduler_ops_metrics.py to work
URL: https://github.com/apache/incubator-airflow/pull/3653
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/scripts/perf/scheduler_ops_metrics.py b/scripts/perf/scheduler_ops_metrics.py
index d4e472d34f..7928649977 100644
--- a/scripts/perf/scheduler_ops_metrics.py
+++ b/scripts/perf/scheduler_ops_metrics.py
@@ -17,7 +17,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datetime import datetime
 import logging
 import pandas as pd
 import sys
@@ -25,6 +24,7 @@
 from airflow import configuration, settings
 from airflow.jobs import SchedulerJob
 from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.utils import timezone
 from airflow.utils.state import State
 
 SUBDIR = 'scripts/perf/dags'
@@ -53,7 +53,10 @@ class SchedulerMetricsJob(SchedulerJob):
     run on remote systems and spend the majority of their time on I/O wait.
 
     To Run:
-        $ python scripts/perf/scheduler_ops_metrics.py
+        $ python scripts/perf/scheduler_ops_metrics.py [timeout]
+
+    You can specify timeout in seconds as an optional parameter.
+    Its default value is 6 seconds.
     """
     __mapper_args__ = {
         'polymorphic_identity': 'SchedulerMetricsJob'
@@ -71,7 +74,7 @@ def print_stats(self):
             .filter(TI.dag_id.in_(DAG_IDS))
             .all()
         )
-        successful_tis = filter(lambda x: x.state == State.SUCCESS, tis)
+        successful_tis = [x for x in tis if x.state == State.SUCCESS]
         ti_perf = [(ti.dag_id, ti.task_id, ti.execution_date,
                     (ti.queued_dttm - self.start_date).total_seconds(),
                     (ti.start_date - self.start_date).total_seconds(),
@@ -117,11 +120,11 @@ def heartbeat(self):
         dagbag = DagBag(SUBDIR)
         dags = [dagbag.dags[dag_id] for dag_id in DAG_IDS]
         # the tasks in perf_dag_1 and per_dag_2 have a daily schedule interval.
-        num_task_instances = sum([(datetime.today() - task.start_date).days
+        num_task_instances = sum([(timezone.utcnow() - task.start_date).days
                                  for dag in dags for task in dag.tasks])
 
         if (len(successful_tis) == num_task_instances or
-                (datetime.now()-self.start_date).total_seconds() >
+                (timezone.utcnow() - self.start_date).total_seconds() >
                 MAX_RUNTIME_SECS):
             if (len(successful_tis) == num_task_instances):
                 self.log.info("All tasks processed! Printing stats.")
@@ -178,6 +181,17 @@ def set_dags_paused_state(is_paused):
 
 
 def main():
+    global MAX_RUNTIME_SECS
+    if len(sys.argv) > 1:
+        try:
+            max_runtime_secs = int(sys.argv[1])
+            if max_runtime_secs < 1:
+                raise ValueError
+            MAX_RUNTIME_SECS = max_runtime_secs
+        except ValueError:
+            logging.error('Specify a positive integer for timeout.')
+            sys.exit(1)
+
     configuration.load_test_config()
 
     set_dags_paused_state(False)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Fix scheduler_ops_metrics.py to work
> ------------------------------------
>
>                 Key: AIRFLOW-2811
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2811
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Kengo Seki
>            Assignee: Kengo Seki
>            Priority: Major
>
> I tried to run {{scripts/perf/scheduler_ops_metrics.py}} but it failed with the following error:
> {code}
> $ python scripts/perf/scheduler_ops_metrics.py 
> (snip)
> Traceback (most recent call last):
>   File "scripts/perf/scheduler_ops_metrics.py", line 192, in <module>
>     main()
>   File "scripts/perf/scheduler_ops_metrics.py", line 188, in main
>     job.run()
>   File "/home/sekikn/dev/incubator-airflow/airflow/jobs.py", line 202, in run
>     self._execute()
>   File "/home/sekikn/dev/incubator-airflow/airflow/jobs.py", line 1584, in _execute
>     self._execute_helper(processor_manager)
>   File "/home/sekikn/dev/incubator-airflow/airflow/jobs.py", line 1714, in _execute_helper
>     self.heartbeat()
>   File "scripts/perf/scheduler_ops_metrics.py", line 121, in heartbeat
>     for dag in dags for task in dag.tasks])
> TypeError: can't subtract offset-naive and offset-aware datetimes
> {code}
> Also, it'd be nice if {{MAX_RUNTIME_SECS}} were configurable, since the default value (6 seconds) is too short for all TaskInstances to finish in my environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)