You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/02 08:35:42 UTC

[GitHub] milton0825 closed pull request #3478: [PTAL][AIRFLOW-2571] airflow backfill progress bar

milton0825 closed pull request #3478: [PTAL][AIRFLOW-2571] airflow backfill progress bar
URL: https://github.com/apache/incubator-airflow/pull/3478
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index b56e325327..949a18c419 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -215,6 +215,7 @@ def backfill(args, dag=None):
             verbose=args.verbose,
             conf=run_conf,
             rerun_failed_tasks=args.rerun_failed_tasks,
+            show_progress_bar=args.progress_bar,
         )
 
 
@@ -1388,7 +1389,12 @@ class CLIFactory(object):
                 "all the failed tasks for the backfill date range "
                 "instead of throwing exceptions"),
             "store_true"),
-
+        'progress_bar': Arg(
+            ("--progress_bar", "-pbar"),
+            (
+                "Show backfill progress"
+            ),
+            "store_true"),
         # list_tasks
         'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
         # list_dags
@@ -1716,7 +1722,7 @@ class CLIFactory(object):
                 'mark_success', 'local', 'donot_pickle',
                 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
                 'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf',
-                'reset_dag_run', 'rerun_failed_tasks',
+                'reset_dag_run', 'rerun_failed_tasks', 'progress_bar',
             )
         }, {
             'func': list_tasks,
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 6cfd2d3769..b3d2b18be7 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -56,8 +56,14 @@ def execute_command(command):
     log.info("Executing command in Celery: %s", command)
     env = os.environ.copy()
     try:
-        subprocess.check_call(command, shell=True, stderr=subprocess.STDOUT,
-                              close_fds=True, env=env)
+        output = subprocess.check_output(
+            command,
+            shell=True,
+            close_fds=True,
+            stderr=subprocess.STDOUT,
+            env=env,
+        )
+        log.info(output)
     except subprocess.CalledProcessError as e:
         log.exception('execute_command encountered a CalledProcessError')
         log.error(e.output)
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index a6ba677f8b..26b90f81d7 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -64,7 +64,13 @@ def execute_async(self, key, command, queue=None, executor_config=None):
             )
 
         def airflow_run():
-            return subprocess.check_call(command, shell=True, close_fds=True)
+            output = subprocess.check_output(
+                command,
+                shell=True,
+                close_fds=True,
+            )
+            self.log.info(output)
+            return 0
 
         future = self.client.submit(airflow_run, pure=False)
         self.futures[future] = key
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 0c85262324..0cdaaff026 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -84,7 +84,12 @@ def execute_work(self, key, command):
         self.log.info("%s running %s", self.__class__.__name__, command)
         command = "exec bash -c '{0}'".format(command)
         try:
-            subprocess.check_call(command, shell=True, close_fds=True)
+            output = subprocess.check_output(
+                command,
+                shell=True,
+                close_fds=True,
+            )
+            self.log.info(output)
             state = State.SUCCESS
         except subprocess.CalledProcessError as e:
             state = State.FAILED
diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py
index 9c0d8ecf0c..f76217014a 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -45,7 +45,12 @@ def sync(self):
             self.log.info("Executing command: %s", command)
 
             try:
-                subprocess.check_call(command, shell=True, close_fds=True)
+                output = subprocess.check_output(
+                    command,
+                    shell=True,
+                    close_fds=True,
+                )
+                self.log.info(output)
                 self.change_state(key, State.SUCCESS)
             except subprocess.CalledProcessError as e:
                 self.change_state(key, State.FAILED)
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ad114abda3..a4691d67fa 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -43,12 +43,13 @@
 from sqlalchemy_utc import UtcDateTime
 from tabulate import tabulate
 from time import sleep
+from tqdm import tqdm
 
 from airflow import configuration as conf
 from airflow import executors, models, settings
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun
-from airflow.settings import Stats
+from airflow.settings import Stats, PROGRESS_BAR_FORMAT
 from airflow.task.task_runner import get_task_runner
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
 from airflow.utils import asciiart, helpers, timezone
@@ -59,7 +60,11 @@
                                           list_py_file_paths)
 from airflow.utils.db import create_session, provide_session
 from airflow.utils.email import send_email
-from airflow.utils.log.logging_mixin import LoggingMixin, set_context, StreamLogWriter
+from airflow.utils.log.logging_mixin import (LoggingMixin,
+                                             set_context,
+                                             StreamLogWriter,
+                                             redirect_stdout_tqdm,
+                                             redirect_stderr_tqdm)
 from airflow.utils.state import State
 from airflow.utils.configuration import tmp_configuration_copy
 from airflow.utils.net import get_hostname
@@ -1958,6 +1963,7 @@ def __init__(
             verbose=False,
             conf=None,
             rerun_failed_tasks=False,
+            show_progress_bar=False,
             *args, **kwargs):
         """
         :param dag: DAG object.
@@ -1984,6 +1990,8 @@ def __init__(
         :param rerun_failed_tasks: flag to whether to
                                    auto rerun the failed task in backfill
         :type rerun_failed_tasks: bool
+        :param show_progress_bar: show progress bar
+        :type show_progress_bar: bool
         :param args:
         :param kwargs:
         """
@@ -2000,23 +2008,28 @@ def __init__(
         self.verbose = verbose
         self.conf = conf
         self.rerun_failed_tasks = rerun_failed_tasks
+        self.show_progress_bar = show_progress_bar
         super(BackfillJob, self).__init__(*args, **kwargs)
 
-    def _update_counters(self, ti_status):
+    def _update_counters(self, ti_status, tasks_pbar):
         """
         Updates the counters per state of the tasks that were running. Can re-add
         to tasks to run in case required.
         :param ti_status: the internal status of the backfill job tasks
         :type ti_status: BackfillJob._DagRunTaskStatus
+        :param tasks_pbar: completed tasks progress bar
+        :type tasks_pbar tqdm
         """
         for key, ti in list(ti_status.running.items()):
             ti.refresh_from_db()
             if ti.state == State.SUCCESS:
+                tasks_pbar.update()
                 ti_status.succeeded.add(key)
                 self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
                 ti_status.running.pop(key)
                 continue
             elif ti.state == State.SKIPPED:
+                tasks_pbar.update()
                 ti_status.skipped.add(key)
                 self.log.debug("Task instance %s skipped. Don't rerun.", ti)
                 ti_status.running.pop(key)
@@ -2196,11 +2209,16 @@ def _log_progress(self, ti_status):
         )
 
     @provide_session
-    def _process_backfill_task_instances(self,
-                                         ti_status,
-                                         executor,
-                                         pickle_id,
-                                         start_date=None, session=None):
+    def _process_backfill_task_instances(
+        self,
+        ti_status,
+        executor,
+        pickle_id,
+        dagruns_pbar,
+        tasks_pbar,
+        start_date=None,
+        session=None,
+    ):
         """
         Process a set of task instances from a set of dag runs. Special handling is done
         to account for different task instance states that could be present when running
@@ -2211,6 +2229,10 @@ def _process_backfill_task_instances(self,
         :type executor: BaseExecutor
         :param pickle_id: the pickle_id if dag is pickled, None otherwise
         :type pickle_id: int
+        :param dagruns_pbar: completed DAG runs progress bar
+        :type dagruns_pbar: tqdm
+        :param tasks_pbar: completed tasks progress bar
+        :type tasks_pbar tqdm
         :param start_date: the start date of the backfill job
         :type start_date: datetime
         :param session: the current session object
@@ -2249,6 +2271,7 @@ def _process_backfill_task_instances(self,
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
+                        tasks_pbar.update()
                         ti_status.succeeded.add(key)
                         self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
                         ti_status.to_run.pop(key)
@@ -2256,6 +2279,7 @@ def _process_backfill_task_instances(self,
                             ti_status.running.pop(key)
                         continue
                     elif ti.state == State.SKIPPED:
+                        tasks_pbar.update()
                         ti_status.skipped.add(key)
                         self.log.debug("Task instance %s skipped. Don't rerun.", ti)
                         ti_status.to_run.pop(key)
@@ -2380,7 +2404,7 @@ def _process_backfill_task_instances(self,
             self._manage_executor_state(ti_status.running)
 
             # update the task counters
-            self._update_counters(ti_status=ti_status)
+            self._update_counters(ti_status=ti_status, tasks_pbar=tasks_pbar)
 
             # update dag run state
             _dag_runs = ti_status.active_runs[:]
@@ -2388,6 +2412,7 @@ def _process_backfill_task_instances(self,
                 run.update_state(session=session)
                 if run.state in State.finished():
                     ti_status.finished_runs += 1
+                    dagruns_pbar.update()
                     ti_status.active_runs.remove(run)
                     executed_run_dates.append(run.execution_date)
 
@@ -2436,8 +2461,17 @@ def _collect_errors(self, ti_status, session=None):
         return err
 
     @provide_session
-    def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
-                               start_date, session=None):
+    def _execute_for_run_dates(
+        self,
+        run_dates,
+        ti_status,
+        executor,
+        pickle_id,
+        start_date,
+        dagruns_pbar,
+        tasks_pbar,
+        session=None,
+    ):
         """
         Computes the dag runs and their respective task instances for
         the given run dates and executes the task instances.
@@ -2452,6 +2486,10 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
         :type pickle_id: int
         :param start_date: backfill start date
         :type start_date: datetime
+        :param dagruns_pbar: completed DAG runs progress bar
+        :type dagruns_pbar: tqdm
+        :param tasks_pbar: completed tasks progress bar
+        :type tasks_pbar tqdm
         :param session: the current session object
         :type session: Session
         """
@@ -2469,6 +2507,8 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
             ti_status=ti_status,
             executor=executor,
             pickle_id=pickle_id,
+            dagruns_pbar=dagruns_pbar,
+            tasks_pbar=tasks_pbar,
             start_date=start_date,
             session=session)
 
@@ -2506,33 +2546,69 @@ def _execute(self, session=None):
         ti_status.total_runs = len(run_dates)  # total dag runs in backfill
 
         try:
-            remaining_dates = ti_status.total_runs
-            while remaining_dates > 0:
-                dates_to_process = [run_date for run_date in run_dates
-                                    if run_date not in ti_status.executed_dag_run_dates]
-
-                self._execute_for_run_dates(run_dates=dates_to_process,
-                                            ti_status=ti_status,
-                                            executor=executor,
-                                            pickle_id=pickle_id,
-                                            start_date=start_date,
-                                            session=session)
-
-                remaining_dates = (
-                    ti_status.total_runs - len(ti_status.executed_dag_run_dates)
+            with redirect_stdout_tqdm() as origin_stdout, redirect_stderr_tqdm():
+                disable_pbar = not self.show_progress_bar
+                dagruns_pbar = tqdm(
+                    total=ti_status.total_runs,
+                    desc='DAG runs completed',
+                    position=1,
+                    dynamic_ncols=True,
+                    ncols='20',
+                    file=origin_stdout,
+                    disable=disable_pbar,
+                    bar_format=PROGRESS_BAR_FORMAT,
                 )
-                err = self._collect_errors(ti_status=ti_status, session=session)
-                if err:
-                    raise AirflowException(err)
+                dagruns_pbar.refresh()
+
+                total_tasks = len(self.dag.tasks) * ti_status.total_runs
+                tasks_pbar = tqdm(
+                    total=total_tasks,
+                    desc='Tasks completed   ',
+                    position=2,
+                    dynamic_ncols=True,
+                    ncols='20',
+                    file=origin_stdout,
+                    disable=disable_pbar,
+                    bar_format=PROGRESS_BAR_FORMAT,
+                )
+                tasks_pbar.refresh()
+
+                remaining_dates = ti_status.total_runs
+                while remaining_dates > 0:
+                    dates_to_process = [
+                        run_date for run_date in run_dates
+                        if run_date not in ti_status.executed_dag_run_dates
+                    ]
+
+                    self._execute_for_run_dates(
+                        run_dates=dates_to_process,
+                        ti_status=ti_status,
+                        executor=executor,
+                        pickle_id=pickle_id,
+                        start_date=start_date,
+                        session=session,
+                        dagruns_pbar=dagruns_pbar,
+                        tasks_pbar=tasks_pbar,
+                    )
 
-                if remaining_dates > 0:
-                    self.log.info(
-                        "max_active_runs limit for dag %s has been reached "
-                        " - waiting for other dag runs to finish",
-                        self.dag_id
+                    remaining_dates = (
+                        ti_status.total_runs - len(ti_status.executed_dag_run_dates)
                     )
-                    time.sleep(self.delay_on_limit_secs)
+                    err = self._collect_errors(ti_status=ti_status, session=session)
+                    if err:
+                        raise AirflowException(err)
+
+                    if remaining_dates > 0:
+                        self.log.info(
+                            "max_active_runs limit for dag %s has been reached "
+                            " - waiting for other dag runs to finish",
+                            self.dag_id
+                        )
+                        time.sleep(self.delay_on_limit_secs)
         finally:
+            dagruns_pbar.close()
+            tasks_pbar.close()
+            print('\n\n')
             executor.end()
             session.commit()
 
diff --git a/airflow/models.py b/airflow/models.py
index 704dc808cf..818c987271 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4016,6 +4016,7 @@ def run(
             verbose=False,
             conf=None,
             rerun_failed_tasks=False,
+            show_progress_bar=False,
     ):
         """
         Runs the DAG.
@@ -4046,6 +4047,8 @@ def run(
         :type verbose: boolean
         :param conf: user defined dictionary passed from CLI
         :type conf: dict
+        :param show_progress_bar: show progress bar
+        :type show_progress_bar: bool
         """
         from airflow.jobs import BackfillJob
         if not executor and local:
@@ -4066,6 +4069,7 @@ def run(
             verbose=verbose,
             conf=conf,
             rerun_failed_tasks=rerun_failed_tasks,
+            show_progress_bar=show_progress_bar,
         )
         job.run()
 
diff --git a/airflow/settings.py b/airflow/settings.py
index 7c0376d12f..4543ad333a 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -50,6 +50,12 @@
     pass
 log.info("Configured default timezone %s" % TIMEZONE)
 
+PROGRESS_BAR_FORMAT = (
+    '{desc}: {n_fmt}/{total_fmt} {percentage:3.0f}%'
+    '|{bar}|'
+    ' [{elapsed}<{remaining},{rate_fmt}{postfix}]'
+)
+
 
 class DummyStatsLogger(object):
     @classmethod
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
index 3f696931c9..7febab2118 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,6 +31,7 @@
 from builtins import object
 from contextlib import contextmanager
 from logging import Handler, StreamHandler
+from tqdm import tqdm
 
 
 class LoggingMixin(object):
@@ -176,3 +177,36 @@ def set_context(logger, value):
             _logger = _logger.parent
         else:
             _logger = None
+
+
+class TqdmWriter(object):
+
+    def __init__(self, file):
+        self.file = file
+
+    def write(self, x):
+        # Avoid print() second call (useless \n)
+        if len(x.rstrip()) > 0:
+            tqdm.write(x, file=self.file)
+
+    def flush(self):
+        return getattr(self.file, "flush", lambda: None)()
+
+
+@contextmanager
+def redirect_stdout_tqdm():
+    origin_stdout = sys.stdout
+    try:
+        sys.stdout = TqdmWriter(sys.stdout)
+        yield origin_stdout
+    finally:
+        sys.stdout = sys.__stdout__
+
+
+@contextmanager
+def redirect_stderr_tqdm():
+    try:
+        sys.stderr = TqdmWriter(sys.stderr)
+        yield
+    finally:
+        sys.stderr = sys.__stderr__
diff --git a/setup.py b/setup.py
index fcae019d93..556dbaf757 100644
--- a/setup.py
+++ b/setup.py
@@ -281,6 +281,7 @@ def do_setup():
             'tabulate>=0.7.5, <0.8.0',
             'tenacity==4.8.0',
             'thrift>=0.9.2',
+            'tqdm==4.23.4',
             'tzlocal>=1.4',
             'unicodecsv>=0.14.1',
             'werkzeug>=0.14.1, <0.15.0',
diff --git a/tests/jobs.py b/tests/jobs.py
index f534b6540a..44f0fc568a 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -902,7 +902,8 @@ def test_backfill_execute_subdag_with_removed_task(self):
         subdag.clear()
         dag.clear()
 
-    def test_update_counters(self):
+    @mock.patch('tqdm.tqdm')
+    def test_update_counters(self, pbar_mock):
         dag = DAG(
             dag_id='test_manage_executor_state',
             start_date=DEFAULT_DATE)
@@ -928,7 +929,7 @@ def test_update_counters(self):
         # test for success
         ti.set_state(State.SUCCESS, session)
         ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
+        job._update_counters(ti_status=ti_status, tasks_pbar=pbar_mock)
         self.assertTrue(len(ti_status.running) == 0)
         self.assertTrue(len(ti_status.succeeded) == 1)
         self.assertTrue(len(ti_status.skipped) == 0)
@@ -940,7 +941,7 @@ def test_update_counters(self):
         # test for skipped
         ti.set_state(State.SKIPPED, session)
         ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
+        job._update_counters(ti_status=ti_status, tasks_pbar=pbar_mock)
         self.assertTrue(len(ti_status.running) == 0)
         self.assertTrue(len(ti_status.succeeded) == 0)
         self.assertTrue(len(ti_status.skipped) == 1)
@@ -952,7 +953,7 @@ def test_update_counters(self):
         # test for failed
         ti.set_state(State.FAILED, session)
         ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
+        job._update_counters(ti_status=ti_status, tasks_pbar=pbar_mock)
         self.assertTrue(len(ti_status.running) == 0)
         self.assertTrue(len(ti_status.succeeded) == 0)
         self.assertTrue(len(ti_status.skipped) == 0)
@@ -965,7 +966,7 @@ def test_update_counters(self):
         # test for failed
         ti.set_state(State.NONE, session)
         ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
+        job._update_counters(ti_status=ti_status, tasks_pbar=pbar_mock)
         self.assertTrue(len(ti_status.running) == 0)
         self.assertTrue(len(ti_status.succeeded) == 0)
         self.assertTrue(len(ti_status.skipped) == 0)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services