You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/28 19:41:21 UTC

incubator-airflow git commit: [AIRFLOW-2015] Add flag for interactive runs

Repository: incubator-airflow
Updated Branches:
  refs/heads/master a1d555177 -> efd8338dc


[AIRFLOW-2015] Add flag for interactive runs

We capture the standard output and error streams
so that they're handled
by the configured logger. However, sometimes, when
developing dags or
Airflow code itself, it is useful to put pdb
breakpoints in code
triggered using an `airflow run`. Such a flow
would of course require
not redirecting the output and error streams to
the logger.

This patch enables that by adding a flag to the
`airflow run`
subcommand. Note that this does not require
`--local`.

Closes #2957 from yati-sagade/ysagade/airflow-2015


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/efd8338d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/efd8338d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/efd8338d

Branch: refs/heads/master
Commit: efd8338dc8bcf50a9d0ace7659650e85d6b49305
Parents: a1d5551
Author: Yati Sagade <ys...@fastmail.nl>
Authored: Sun Jan 28 20:41:14 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Sun Jan 28 20:41:14 2018 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py | 120 ++++++++++++++++++++++++++----------------------
 1 file changed, 65 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efd8338d/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index b032729..d0c11d3 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -324,6 +324,58 @@ def set_is_paused(is_paused, args, dag=None):
     print(msg)
 
 
+def _run(args, dag, ti):
+    if args.local:
+        run_job = jobs.LocalTaskJob(
+            task_instance=ti,
+            mark_success=args.mark_success,
+            pickle_id=args.pickle,
+            ignore_all_deps=args.ignore_all_dependencies,
+            ignore_depends_on_past=args.ignore_depends_on_past,
+            ignore_task_deps=args.ignore_dependencies,
+            ignore_ti_state=args.force,
+            pool=args.pool)
+        run_job.run()
+    elif args.raw:
+        ti._run_raw_task(
+            mark_success=args.mark_success,
+            job_id=args.job_id,
+            pool=args.pool,
+        )
+    else:
+        pickle_id = None
+        if args.ship_dag:
+            try:
+                # Running remotely, so pickling the DAG
+                session = settings.Session()
+                pickle = DagPickle(dag)
+                session.add(pickle)
+                session.commit()
+                pickle_id = pickle.id
+                # TODO: This should be written to a log
+                print('Pickled dag {dag} as pickle_id:{pickle_id}'
+                      .format(**locals()))
+            except Exception as e:
+                print('Could not pickle the DAG')
+                print(e)
+                raise e
+
+        executor = GetDefaultExecutor()
+        executor.start()
+        print("Sending to executor.")
+        executor.queue_task_instance(
+            ti,
+            mark_success=args.mark_success,
+            pickle_id=pickle_id,
+            ignore_all_deps=args.ignore_all_dependencies,
+            ignore_depends_on_past=args.ignore_depends_on_past,
+            ignore_task_deps=args.ignore_dependencies,
+            ignore_ti_state=args.force,
+            pool=args.pool)
+        executor.heartbeat()
+        executor.end()
+
+
 def run(args, dag=None):
     # Disable connection pooling to reduce the # of connections on the DB
     # while it's waiting for the task to finish.
@@ -368,60 +420,13 @@ def run(args, dag=None):
     hostname = socket.getfqdn()
     log.info("Running %s on host %s", ti, hostname)
 
-    with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
-        if args.local:
-            run_job = jobs.LocalTaskJob(
-                task_instance=ti,
-                mark_success=args.mark_success,
-                pickle_id=args.pickle,
-                ignore_all_deps=args.ignore_all_dependencies,
-                ignore_depends_on_past=args.ignore_depends_on_past,
-                ignore_task_deps=args.ignore_dependencies,
-                ignore_ti_state=args.force,
-                pool=args.pool)
-            run_job.run()
-        elif args.raw:
-            ti._run_raw_task(
-                mark_success=args.mark_success,
-                job_id=args.job_id,
-                pool=args.pool,
-            )
-        else:
-            pickle_id = None
-            if args.ship_dag:
-                try:
-                    # Running remotely, so pickling the DAG
-                    session = settings.Session()
-                    pickle = DagPickle(dag)
-                    session.add(pickle)
-                    session.commit()
-                    pickle_id = pickle.id
-                    # TODO: This should be written to a log
-                    print((
-                              'Pickled dag {dag} '
-                              'as pickle_id:{pickle_id}').format(**locals()))
-                except Exception as e:
-                    print('Could not pickle the DAG')
-                    print(e)
-                    raise e
-
-            executor = GetDefaultExecutor()
-            executor.start()
-            print("Sending to executor.")
-            executor.queue_task_instance(
-                ti,
-                mark_success=args.mark_success,
-                pickle_id=pickle_id,
-                ignore_all_deps=args.ignore_all_dependencies,
-                ignore_depends_on_past=args.ignore_depends_on_past,
-                ignore_task_deps=args.ignore_dependencies,
-                ignore_ti_state=args.force,
-                pool=args.pool)
-            executor.heartbeat()
-            executor.end()
-
-    logging.shutdown()
-
+    if args.interactive:
+        _run(args, dag, ti)
+    else:
+        with redirect_stdout(ti.log, logging.INFO),\
+                redirect_stderr(ti.log, logging.WARN):
+            _run(args, dag, ti)
+        logging.shutdown()
 
 def task_failed_deps(args):
     """
@@ -1281,6 +1286,11 @@ class CLIFactory(object):
         # dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
         # the "ignore_all_dependencies" command should be called the"force" command
         # instead.
+        'interactive': Arg(
+            ('-int', '--interactive'),
+            help='Do not capture standard output and error streams '
+                 '(useful for interactive debugging)',
+            action='store_true'),
         'force': Arg(
             ("-f", "--force"),
             "Ignore previous task instance state, rerun regardless if task already "
@@ -1529,7 +1539,7 @@ class CLIFactory(object):
                 'dag_id', 'task_id', 'execution_date', 'subdir',
                 'mark_success', 'force', 'pool', 'cfg_path',
                 'local', 'raw', 'ignore_all_dependencies', 'ignore_dependencies',
-                'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id'),
+                'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id', 'interactive',),
         }, {
             'func': initdb,
             'help': "Initialize the metadata database",