You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/06 16:42:03 UTC
[GitHub] feng-tao closed pull request #3653: [AIRFLOW-2811] Fix
scheduler_ops_metrics.py to work
feng-tao closed pull request #3653: [AIRFLOW-2811] Fix scheduler_ops_metrics.py to work
URL: https://github.com/apache/incubator-airflow/pull/3653
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/scripts/perf/scheduler_ops_metrics.py b/scripts/perf/scheduler_ops_metrics.py
index d4e472d34f..7928649977 100644
--- a/scripts/perf/scheduler_ops_metrics.py
+++ b/scripts/perf/scheduler_ops_metrics.py
@@ -17,7 +17,6 @@
# specific language governing permissions and limitations
# under the License.
-from datetime import datetime
import logging
import pandas as pd
import sys
@@ -25,6 +24,7 @@
from airflow import configuration, settings
from airflow.jobs import SchedulerJob
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.utils import timezone
from airflow.utils.state import State
SUBDIR = 'scripts/perf/dags'
@@ -53,7 +53,10 @@ class SchedulerMetricsJob(SchedulerJob):
run on remote systems and spend the majority of their time on I/O wait.
To Run:
- $ python scripts/perf/scheduler_ops_metrics.py
+ $ python scripts/perf/scheduler_ops_metrics.py [timeout]
+
+ You can specify timeout in seconds as an optional parameter.
+ Its default value is 6 seconds.
"""
__mapper_args__ = {
'polymorphic_identity': 'SchedulerMetricsJob'
@@ -71,7 +74,7 @@ def print_stats(self):
.filter(TI.dag_id.in_(DAG_IDS))
.all()
)
- successful_tis = filter(lambda x: x.state == State.SUCCESS, tis)
+ successful_tis = [x for x in tis if x.state == State.SUCCESS]
ti_perf = [(ti.dag_id, ti.task_id, ti.execution_date,
(ti.queued_dttm - self.start_date).total_seconds(),
(ti.start_date - self.start_date).total_seconds(),
@@ -117,11 +120,11 @@ def heartbeat(self):
dagbag = DagBag(SUBDIR)
dags = [dagbag.dags[dag_id] for dag_id in DAG_IDS]
# the tasks in perf_dag_1 and per_dag_2 have a daily schedule interval.
- num_task_instances = sum([(datetime.today() - task.start_date).days
+ num_task_instances = sum([(timezone.utcnow() - task.start_date).days
for dag in dags for task in dag.tasks])
if (len(successful_tis) == num_task_instances or
- (datetime.now()-self.start_date).total_seconds() >
+ (timezone.utcnow() - self.start_date).total_seconds() >
MAX_RUNTIME_SECS):
if (len(successful_tis) == num_task_instances):
self.log.info("All tasks processed! Printing stats.")
@@ -178,6 +181,17 @@ def set_dags_paused_state(is_paused):
def main():
+ global MAX_RUNTIME_SECS
+ if len(sys.argv) > 1:
+ try:
+ max_runtime_secs = int(sys.argv[1])
+ if max_runtime_secs < 1:
+ raise ValueError
+ MAX_RUNTIME_SECS = max_runtime_secs
+ except ValueError:
+ logging.error('Specify a positive integer for timeout.')
+ sys.exit(1)
+
configuration.load_test_config()
set_dags_paused_state(False)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services