You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/07/21 01:08:53 UTC
incubator-airflow git commit: [Airflow 1332] Split logs based on try
number
Repository: incubator-airflow
Updated Branches:
refs/heads/master b9576d57b -> b49986c3b
[Airflow 1332] Split logs based on try number
This PR splits logs based on try number and add
tabs to display different task instance tries.
**Note this PR is a temporary change for
separating task attempts. The code in this PR will
be refactored in the future. Please refer to #2422
for Airflow logging abstractions redesign.**
Testing:
1. Added unit tests.
2. Tested on localhost.
3. Tested on production environment with S3 remote
storage, MySQL database, Redis, one Airflow
scheduler and two airflow workers.
Closes #2383 from AllisonWang/allison--add-task-
attempt
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b49986c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b49986c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b49986c3
Branch: refs/heads/master
Commit: b49986c3b24a5382f817d5a3fc40add87b464ba2
Parents: b9576d5
Author: AllisonWang <al...@gmail.com>
Authored: Thu Jul 20 18:08:15 2017 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Thu Jul 20 18:08:18 2017 -0700
----------------------------------------------------------------------
airflow/bin/cli.py | 81 +++++------
airflow/models.py | 50 ++++---
airflow/utils/logging.py | 66 +++++++++
airflow/www/templates/airflow/ti_log.html | 40 ++++++
airflow/www/views.py | 180 +++++++++++++------------
dags/test_dag.py | 4 +-
docs/scheduler.rst | 19 ++-
tests/models.py | 137 ++++++++++++++++++-
tests/operators/python_operator.py | 88 +++++++-----
tests/utils/test_dates.py | 4 -
tests/utils/test_logging.py | 29 ++++
11 files changed, 503 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index f568d5d..a8543d3 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -59,7 +59,6 @@ from airflow.www.app import cached_app
from sqlalchemy import func
from sqlalchemy.orm import exc
-
api.load_auth()
api_module = import_module(conf.get('cli', 'api_client'))
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
@@ -316,7 +315,7 @@ def run(args, dag=None):
# Load custom airflow config
if args.cfg_path:
with open(args.cfg_path, 'r') as conf_file:
- conf_dict = json.load(conf_file)
+ conf_dict = json.load(conf_file)
if os.path.exists(args.cfg_path):
os.remove(args.cfg_path)
@@ -327,6 +326,21 @@ def run(args, dag=None):
settings.configure_vars()
settings.configure_orm()
+ if not args.pickle and not dag:
+ dag = get_dag(args)
+ elif not dag:
+ session = settings.Session()
+ logging.info('Loading pickle id {args.pickle}'.format(args=args))
+ dag_pickle = session.query(
+ DagPickle).filter(DagPickle.id == args.pickle).first()
+ if not dag_pickle:
+ raise AirflowException("Who hid the pickle!? [missing pickle]")
+ dag = dag_pickle.pickle
+
+ task = dag.get_task(task_id=args.task_id)
+ ti = TaskInstance(task, args.execution_date)
+ ti.refresh_from_db()
+
logging.root.handlers = []
if args.raw:
# Output to STDOUT for the parent process to read and log
@@ -350,19 +364,23 @@ def run(args, dag=None):
# writable by both users, then it's possible that re-running a task
# via the UI (or vice versa) results in a permission error as the task
# tries to write to a log file created by the other user.
+ try_number = ti.try_number
log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
- directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args)
+ log_relative_dir = logging_utils.get_log_directory(args.dag_id, args.task_id,
+ args.execution_date)
+ directory = os.path.join(log_base, log_relative_dir)
# Create the log file and give it group writable permissions
# TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
# operator is not compatible with impersonation (e.g. if a Celery executor is used
# for a SubDag operator and the SubDag operator has a different owner than the
# parent DAG)
- if not os.path.exists(directory):
+ if not os.path.isdir(directory):
# Create the directory as globally writable using custom mkdirs
# as os.makedirs doesn't set mode properly.
mkdirs(directory, 0o775)
- iso = args.execution_date.isoformat()
- filename = "{directory}/{iso}".format(**locals())
+ log_relative = logging_utils.get_log_filename(
+ args.dag_id, args.task_id, args.execution_date, try_number)
+ filename = os.path.join(log_base, log_relative)
if not os.path.exists(filename):
open(filename, "a").close()
@@ -376,21 +394,6 @@ def run(args, dag=None):
hostname = socket.getfqdn()
logging.info("Running on host {}".format(hostname))
- if not args.pickle and not dag:
- dag = get_dag(args)
- elif not dag:
- session = settings.Session()
- logging.info('Loading pickle id {args.pickle}'.format(**locals()))
- dag_pickle = session.query(
- DagPickle).filter(DagPickle.id == args.pickle).first()
- if not dag_pickle:
- raise AirflowException("Who hid the pickle!? [missing pickle]")
- dag = dag_pickle.pickle
- task = dag.get_task(task_id=args.task_id)
-
- ti = TaskInstance(task, args.execution_date)
- ti.refresh_from_db()
-
if args.local:
print("Logging into: " + filename)
run_job = jobs.LocalTaskJob(
@@ -424,8 +427,8 @@ def run(args, dag=None):
session.commit()
pickle_id = pickle.id
print((
- 'Pickled dag {dag} '
- 'as pickle_id:{pickle_id}').format(**locals()))
+ 'Pickled dag {dag} '
+ 'as pickle_id:{pickle_id}').format(**locals()))
except Exception as e:
print('Could not pickle the DAG')
print(e)
@@ -475,7 +478,8 @@ def run(args, dag=None):
with open(filename, 'r') as logfile:
log = logfile.read()
- remote_log_location = filename.replace(log_base, remote_base)
+ remote_log_location = os.path.join(remote_base, log_relative)
+ logging.debug("Uploading to remote log location {}".format(remote_log_location))
# S3
if remote_base.startswith('s3:/'):
logging_utils.S3Log().write(log, remote_log_location)
@@ -669,10 +673,10 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
gunicorn_master_proc.send_signal(signal.SIGTTIN)
excess += 1
wait_until_true(lambda: num_workers_expected + excess ==
- get_num_workers_running(gunicorn_master_proc))
+ get_num_workers_running(gunicorn_master_proc))
wait_until_true(lambda: num_workers_expected ==
- get_num_workers_running(gunicorn_master_proc))
+ get_num_workers_running(gunicorn_master_proc))
while True:
num_workers_running = get_num_workers_running(gunicorn_master_proc)
@@ -695,7 +699,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
gunicorn_master_proc.send_signal(signal.SIGTTOU)
excess -= 1
wait_until_true(lambda: num_workers_expected + excess ==
- get_num_workers_running(gunicorn_master_proc))
+ get_num_workers_running(gunicorn_master_proc))
# Start a new worker by asking gunicorn to increase number of workers
elif num_workers_running == num_workers_expected:
@@ -887,6 +891,7 @@ def serve_logs(args):
filename,
mimetype="application/json",
as_attachment=False)
+
WORKER_LOG_SERVER_PORT = \
int(conf.get('celery', 'WORKER_LOG_SERVER_PORT'))
flask_app.run(
@@ -947,8 +952,8 @@ def initdb(args): # noqa
def resetdb(args):
print("DB: " + repr(settings.engine.url))
if args.yes or input(
- "This will drop existing tables if they exist. "
- "Proceed? (y/n)").upper() == "Y":
+ "This will drop existing tables if they exist. "
+ "Proceed? (y/n)").upper() == "Y":
logging.basicConfig(level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
db_utils.resetdb()
@@ -966,7 +971,7 @@ def upgradedb(args): # noqa
if not ds_rows:
qry = (
session.query(DagRun.dag_id, DagRun.state, func.count('*'))
- .group_by(DagRun.dag_id, DagRun.state)
+ .group_by(DagRun.dag_id, DagRun.state)
)
for dag_id, state, count in qry:
session.add(DagStat(dag_id=dag_id, state=state, count=count))
@@ -1065,8 +1070,8 @@ def connections(args):
session = settings.Session()
if not (session
- .query(Connection)
- .filter(Connection.conn_id == new_conn.conn_id).first()):
+ .query(Connection)
+ .filter(Connection.conn_id == new_conn.conn_id).first()):
session.add(new_conn)
session.commit()
msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'
@@ -1168,16 +1173,16 @@ class CLIFactory(object):
'dry_run': Arg(
("-dr", "--dry_run"), "Perform a dry run", "store_true"),
'pid': Arg(
- ("--pid", ), "PID file location",
+ ("--pid",), "PID file location",
nargs='?'),
'daemon': Arg(
("-D", "--daemon"), "Daemonize instead of running "
"in the foreground",
"store_true"),
'stderr': Arg(
- ("--stderr", ), "Redirect stderr to this file"),
+ ("--stderr",), "Redirect stderr to this file"),
'stdout': Arg(
- ("--stdout", ), "Redirect stdout to this file"),
+ ("--stdout",), "Redirect stdout to this file"),
'log_file': Arg(
("-l", "--log-file"), "Location of the log file"),
@@ -1333,7 +1338,7 @@ class CLIFactory(object):
"Serialized pickle object of the entire dag (used internally)"),
'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS),
'cfg_path': Arg(
- ("--cfg_path", ), "Path to config file to use instead of airflow.cfg"),
+ ("--cfg_path",), "Path to config file to use instead of airflow.cfg"),
# webserver
'port': Arg(
("-p", "--port"),
@@ -1341,11 +1346,11 @@ class CLIFactory(object):
type=int,
help="The port on which to run the server"),
'ssl_cert': Arg(
- ("--ssl_cert", ),
+ ("--ssl_cert",),
default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'),
help="Path to the SSL certificate for the webserver"),
'ssl_key': Arg(
- ("--ssl_key", ),
+ ("--ssl_key",),
default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'),
help="Path to the key to use with the SSL certificate"),
'workers': Arg(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 32ad144..c1fd4a3 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -108,7 +108,7 @@ else:
_CONTEXT_MANAGER_DAG = None
-def clear_task_instances(tis, session, activate_dag_runs=True):
+def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
"""
Clears a set of task instances, but makes sure the running ones
get killed.
@@ -119,12 +119,20 @@ def clear_task_instances(tis, session, activate_dag_runs=True):
if ti.job_id:
ti.state = State.SHUTDOWN
job_ids.append(ti.job_id)
- # todo: this creates an issue with the webui tests
- # elif ti.state != State.REMOVED:
- # ti.state = State.NONE
- # session.merge(ti)
else:
- session.delete(ti)
+ task_id = ti.task_id
+ if dag and dag.has_task(task_id):
+ task = dag.get_task(task_id)
+ task_retries = task.retries
+ ti.max_tries = ti.try_number + task_retries
+ else:
+ # Ignore errors when updating max_tries if dag is None or
+ # task not found in dag since database records could be
+ # outdated. We make max_tries the maximum value of its
+ # original max_tries or the current task try number.
+ ti.max_tries = max(ti.max_tries, ti.try_number)
+ ti.state = State.NONE
+ session.merge(ti)
if job_ids:
from airflow.jobs import BaseJob as BJ
@@ -1316,8 +1324,8 @@ class TaskInstance(Base):
# not 0-indexed lists (i.e. Attempt 1 instead of
# Attempt 0 for the first attempt).
msg = "Starting attempt {attempt} of {total}".format(
- attempt=self.try_number % (task.retries + 1) + 1,
- total=task.retries + 1)
+ attempt=self.try_number + 1,
+ total=self.max_tries + 1)
self.start_date = datetime.now()
dep_context = DepContext(
@@ -1338,8 +1346,8 @@ class TaskInstance(Base):
self.state = State.NONE
msg = ("FIXME: Rescheduling due to concurrency limits reached at task "
"runtime. Attempt {attempt} of {total}. State set to NONE.").format(
- attempt=self.try_number % (task.retries + 1) + 1,
- total=task.retries + 1)
+ attempt=self.try_number + 1,
+ total=self.max_tries + 1)
logging.warning(hr + msg + hr)
self.queued_dttm = datetime.now()
@@ -1486,7 +1494,11 @@ class TaskInstance(Base):
# Let's go deeper
try:
- if task.retries and self.try_number % (task.retries + 1) != 0:
+ # try_number is incremented by 1 during task instance run. So the
+ # current task instance try_number is the try_number for the next
+ # task instance run. We only mark task instance as FAILED if the
+ # next task instance try_number exceeds the max_tries.
+ if task.retries and self.try_number <= self.max_tries:
self.state = State.UP_FOR_RETRY
logging.info('Marking task as UP_FOR_RETRY')
if task.email_on_retry and task.email:
@@ -1641,15 +1653,17 @@ class TaskInstance(Base):
task = self.task
title = "Airflow alert: {self}".format(**locals())
exception = str(exception).replace('\n', '<br>')
- try_ = task.retries + 1
+ # For reporting purposes, we report based on 1-indexed,
+ # not 0-indexed lists (i.e. Try 1 instead of
+ # Try 0 for the first attempt).
body = (
- "Try {self.try_number} out of {try_}<br>"
+ "Try {try_number} out of {max_tries}<br>"
"Exception:<br>{exception}<br>"
"Log: <a href='{self.log_url}'>Link</a><br>"
"Host: {self.hostname}<br>"
"Log file: {self.log_filepath}<br>"
"Mark success: <a href='{self.mark_success_url}'>Link</a><br>"
- ).format(**locals())
+ ).format(try_number=self.try_number + 1, max_tries=self.max_tries + 1, **locals())
send_email(task.email, title, body)
def set_duration(self):
@@ -2382,9 +2396,7 @@ class BaseOperator(object):
def downstream_task_ids(self):
return self._downstream_task_ids
- def clear(
- self, start_date=None, end_date=None,
- upstream=False, downstream=False):
+ def clear(self, start_date=None, end_date=None, upstream=False, downstream=False):
"""
Clears the state of task instances associated with the task, following
the parameters specified.
@@ -2413,7 +2425,7 @@ class BaseOperator(object):
count = qry.count()
- clear_task_instances(qry.all(), session)
+ clear_task_instances(qry.all(), session, dag=self.dag)
session.commit()
session.close()
@@ -3244,7 +3256,7 @@ class DAG(BaseDag, LoggingMixin):
do_it = utils.helpers.ask_yesno(question)
if do_it:
- clear_task_instances(tis.all(), session)
+ clear_task_instances(tis.all(), session, dag=self)
if reset_dag_runs:
self.set_dag_runs_state(session=session)
else:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/airflow/utils/logging.py
----------------------------------------------------------------------
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index 96767cb..b86d839 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -19,7 +19,9 @@ from __future__ import unicode_literals
from builtins import object
+import dateutil.parser
import logging
+import six
from airflow import configuration
from airflow.exceptions import AirflowException
@@ -57,6 +59,19 @@ class S3Log(object):
'Please make sure that airflow[s3] is installed and '
'the S3 connection exists.'.format(remote_conn_id))
+ def log_exists(self, remote_log_location):
+ """
+ Check if remote_log_location exists in remote storage
+ :param remote_log_location: log's location in remote storage
+ :return: True if location exists else False
+ """
+ if self.hook:
+ try:
+ return self.hook.get_key(remote_log_location) is not None
+ except Exception:
+ pass
+ return False
+
def read(self, remote_log_location, return_error=False):
"""
Returns the log found at the remote_log_location. Returns '' if no
@@ -137,6 +152,20 @@ class GCSLog(object):
'"{}". Please make sure that airflow[gcp_api] is installed '
'and the GCS connection exists.'.format(remote_conn_id))
+ def log_exists(self, remote_log_location):
+ """
+ Check if remote_log_location exists in remote storage
+ :param remote_log_location: log's location in remote storage
+ :return: True if location exists else False
+ """
+ if self.hook:
+ try:
+ bkt, blob = self.parse_gcs_url(remote_log_location)
+ return self.hook.exists(bkt, blob)
+ except Exception:
+ pass
+ return False
+
def read(self, remote_log_location, return_error=False):
"""
Returns the log found at the remote_log_location.
@@ -211,3 +240,40 @@ class GCSLog(object):
bucket = parsed_url.netloc
blob = parsed_url.path.strip('/')
return (bucket, blob)
+
+
+# TODO: get_log_filename and get_log_directory are temporary helper
+# functions to get airflow log filename. Logic of using FileHandler
+# will be extract out and those two functions will be moved.
+# For more details, please check issue AIRFLOW-1385.
+def get_log_filename(dag_id, task_id, execution_date, try_number):
+ """
+ Return relative log path.
+ :arg dag_id: id of the dag
+ :arg task_id: id of the task
+ :arg execution_date: execution date of the task instance
+ :arg try_number: try_number of current task instance
+ """
+ relative_dir = get_log_directory(dag_id, task_id, execution_date)
+ # For reporting purposes and keeping logs consistent with web UI
+ # display, we report based on 1-indexed, not 0-indexed lists
+ filename = "{}/{}.log".format(relative_dir, try_number+1)
+
+ return filename
+
+
+def get_log_directory(dag_id, task_id, execution_date):
+ """
+ Return log directory path: dag_id/task_id/execution_date
+ :arg dag_id: id of the dag
+ :arg task_id: id of the task
+ :arg execution_date: execution date of the task instance
+ """
+ # execution_date could be parsed in as unicode character
+ # instead of datetime object.
+ if isinstance(execution_date, six.string_types):
+ execution_date = dateutil.parser.parse(execution_date)
+ iso = execution_date.isoformat()
+ relative_dir = '{}/{}/{}'.format(dag_id, task_id, iso)
+
+ return relative_dir
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/airflow/www/templates/airflow/ti_log.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/ti_log.html b/airflow/www/templates/airflow/ti_log.html
new file mode 100644
index 0000000..03c0ed3
--- /dev/null
+++ b/airflow/www/templates/airflow/ti_log.html
@@ -0,0 +1,40 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+#}
+{% extends "airflow/task_instance.html" %}
+{% block title %}Airflow - DAGs{% endblock %}
+
+{% block body %}
+ {{ super() }}
+ <h4>{{ title }}</h4>
+ <ul class="nav nav-pills" role="tablist">
+ {% for log in logs %}
+ <li role="presentation" class="{{ 'active' if loop.last else '' }}">
+ <a href="#{{ loop.index }}" aria-controls="{{ loop.index }}" role="tab" data-toggle="tab">
+ {{ loop.index }}
+ </a>
+ </li>
+ {% endfor %}
+ </ul>
+ <div class="tab-content">
+ {% for log in logs %}
+ <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' }}" id="{{ loop.index }}">
+ <pre id="attempt-{{ loop.index }}">{{ log }}</pre>
+ </div>
+ {% endfor %}
+ </div>
+{% endblock %}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 6c39462..046c2e1 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -67,7 +67,7 @@ from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
from airflow.models import BaseOperator
from airflow.operators.subdag_operator import SubDagOperator
-from airflow.utils.logging import LoggingMixin
+from airflow.utils.logging import LoggingMixin, get_log_filename
from airflow.utils.json import json_ser
from airflow.utils.state import State
from airflow.utils.db import provide_session
@@ -694,110 +694,112 @@ class Airflow(BaseView):
form=form,
title=title,)
+ def _get_log(self, ti, log_filename):
+ """
+ Get log for a specific try number.
+ :param ti: current task instance
+ :param log_filename: relative filename to fetch the log
+ """
+ # TODO: This is not the best practice. Log handler and
+ # reader should be configurable and separated from the
+ # frontend. The new airflow logging design is in progress.
+ # Please refer to #2422(https://github.com/apache/incubator-airflow/pull/2422).
+ log = ''
+ # Load remote log
+ remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+ remote_log_loaded = False
+ if remote_log_base:
+ remote_log_path = os.path.join(remote_log_base, log_filename)
+ remote_log = ""
+
+ # S3
+ if remote_log_path.startswith('s3:/'):
+ s3_log = log_utils.S3Log()
+ if s3_log.log_exists(remote_log_path):
+ remote_log += s3_log.read(remote_log_path, return_error=True)
+ remote_log_loaded = True
+ # GCS
+ elif remote_log_path.startswith('gs:/'):
+ gcs_log = log_utils.GCSLog()
+ if gcs_log.log_exists(remote_log_path):
+ remote_log += gcs_log.read(remote_log_path, return_error=True)
+ remote_log_loaded = True
+ # unsupported
+ else:
+ remote_log += '*** Unsupported remote log location.'
+
+ if remote_log:
+ log += ('*** Reading remote log from {}.\n{}\n'.format(
+ remote_log_path, remote_log))
+
+ # We only want to display local log if the remote log is not loaded.
+ if not remote_log_loaded:
+ # Load local log
+ local_log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
+ local_log_path = os.path.join(local_log_base, log_filename)
+ if os.path.exists(local_log_path):
+ try:
+ f = open(local_log_path)
+ log += "*** Reading local log.\n" + "".join(f.readlines())
+ f.close()
+ except:
+ log = "*** Failed to load local log file: {0}.\n".format(local_log_path)
+ else:
+ WORKER_LOG_SERVER_PORT = conf.get('celery', 'WORKER_LOG_SERVER_PORT')
+ url = os.path.join(
+ "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_filename
+ ).format(**locals())
+ log += "*** Log file isn't local.\n"
+ log += "*** Fetching here: {url}\n".format(**locals())
+ try:
+ import requests
+ timeout = None # No timeout
+ try:
+ timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
+ except (AirflowConfigException, ValueError):
+ pass
+
+ response = requests.get(url, timeout=timeout)
+ response.raise_for_status()
+ log += '\n' + response.text
+ except:
+ log += "*** Failed to fetch log file from work r.\n".format(
+ **locals())
+
+ if PY2 and not isinstance(log, unicode):
+ log = log.decode('utf-8')
+
+ return log
+
@expose('/log')
@login_required
@wwwutils.action_logging
def log(self):
- BASE_LOG_FOLDER = os.path.expanduser(
- conf.get('core', 'BASE_LOG_FOLDER'))
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
- dag = dagbag.get_dag(dag_id)
- log_relative = "{dag_id}/{task_id}/{execution_date}".format(
- **locals())
- loc = os.path.join(BASE_LOG_FOLDER, log_relative)
- loc = loc.format(**locals())
- log = ""
- TI = models.TaskInstance
dttm = dateutil.parser.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
+ dag = dagbag.get_dag(dag_id)
+ TI = models.TaskInstance
session = Session()
ti = session.query(TI).filter(
- TI.dag_id == dag_id, TI.task_id == task_id,
+ TI.dag_id == dag_id,
+ TI.task_id == task_id,
TI.execution_date == dttm).first()
-
+ logs = []
if ti is None:
- log = "*** Task instance did not exist in the DB\n"
+ logs = ["*** Task instance did not exist in the DB\n"]
else:
- # load remote logs
- remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
- remote_log_loaded = False
- if remote_log_base:
- remote_log_path = os.path.join(remote_log_base, log_relative)
- remote_log = ""
-
- # Only display errors reading the log if the task completed or ran at least
- # once before (otherwise there won't be any remote log stored).
- ti_execution_completed = ti.state in {State.SUCCESS, State.FAILED}
- ti_ran_more_than_once = ti.try_number > 1
- surface_log_retrieval_errors = (
- ti_execution_completed or ti_ran_more_than_once)
-
- # S3
- if remote_log_path.startswith('s3:/'):
- remote_log += log_utils.S3Log().read(
- remote_log_path, return_error=surface_log_retrieval_errors)
- remote_log_loaded = True
- # GCS
- elif remote_log_path.startswith('gs:/'):
- remote_log += log_utils.GCSLog().read(
- remote_log_path, return_error=surface_log_retrieval_errors)
- remote_log_loaded = True
- # unsupported
- else:
- remote_log += '*** Unsupported remote log location.'
-
- if remote_log:
- log += ('*** Reading remote log from {}.\n{}\n'.format(
- remote_log_path, remote_log))
-
- # We only want to display the
- # local logs while the task is running if a remote log configuration is set up
- # since the logs will be transfered there after the run completes.
- # TODO(aoen): One problem here is that if a task is running on a worker it
- # already ran on, then duplicate logs will be printed for all of the previous
- # runs of the task that already completed since they will have been printed as
- # part of the remote log section above. This can be fixed either by streaming
- # logs to the log servers as tasks are running, or by creating a proper
- # abstraction for multiple task instance runs).
- if not remote_log_loaded or ti.state == State.RUNNING:
- if os.path.exists(loc):
- try:
- f = open(loc)
- log += "*** Reading local log.\n" + "".join(f.readlines())
- f.close()
- except:
- log = "*** Failed to load local log file: {0}.\n".format(loc)
- else:
- WORKER_LOG_SERVER_PORT = \
- conf.get('celery', 'WORKER_LOG_SERVER_PORT')
- url = os.path.join(
- "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_relative
- ).format(**locals())
- log += "*** Log file isn't local.\n"
- log += "*** Fetching here: {url}\n".format(**locals())
- try:
- import requests
- timeout = None # No timeout
- try:
- timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
- except (AirflowConfigException, ValueError):
- pass
-
- response = requests.get(url, timeout=timeout)
- response.raise_for_status()
- log += '\n' + response.text
- except:
- log += "*** Failed to fetch log file from worker.\n".format(
- **locals())
-
- if PY2 and not isinstance(log, unicode):
- log = log.decode('utf-8')
+ logs = [''] * ti.try_number
+ for try_number in range(ti.try_number):
+ log_filename = get_log_filename(
+ dag_id, task_id, execution_date, try_number)
+ logs[try_number] += self._get_log(ti, log_filename)
return self.render(
- 'airflow/ti_code.html',
- code=log, dag=dag, title="Log", task_id=task_id,
+ 'airflow/ti_log.html',
+ logs=logs, dag=dag, title="Log by attempts", task_id=task_id,
execution_date=execution_date, form=form)
@expose('/task')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/dags/test_dag.py
----------------------------------------------------------------------
diff --git a/dags/test_dag.py b/dags/test_dag.py
index f2a9f6a..8dcde15 100644
--- a/dags/test_dag.py
+++ b/dags/test_dag.py
@@ -19,7 +19,7 @@ from datetime import datetime, timedelta
now = datetime.now()
now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0)
-START_DATE = now_to_the_hour
+START_DATE = now_to_the_hour
DAG_NAME = 'test_dag_v1'
default_args = {
@@ -34,5 +34,3 @@ run_this_2 = DummyOperator(task_id='run_this_2', dag=dag)
run_this_2.set_upstream(run_this_1)
run_this_3 = DummyOperator(task_id='run_this_3', dag=dag)
run_this_3.set_upstream(run_this_2)
-
-
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/docs/scheduler.rst
----------------------------------------------------------------------
diff --git a/docs/scheduler.rst b/docs/scheduler.rst
index 4c5c6be..8029eb0 100644
--- a/docs/scheduler.rst
+++ b/docs/scheduler.rst
@@ -147,8 +147,19 @@ To Keep in Mind
Here are some of the ways you can **unblock tasks**:
-* From the UI, you can **clear** (as in delete the status of) individual task instances from the task instances dialog, while defining whether you want to includes the past/future and the upstream/downstream dependencies. Note that a confirmation window comes next and allows you to see the set you are about to clear.
-* The CLI command ``airflow clear -h`` has lots of options when it comes to clearing task instance states, including specifying date ranges, targeting task_ids by specifying a regular expression, flags for including upstream and downstream relatives, and targeting task instances in specific states (``failed``, or ``success``)
-* Marking task instances as successful can be done through the UI. This is mostly to fix false negatives, or for instance when the fix has been applied outside of Airflow.
-* The ``airflow backfill`` CLI subcommand has a flag to ``--mark_success`` and allows selecting subsections of the DAG as well as specifying date ranges.
+* From the UI, you can **clear** (as in delete the status of) individual task instances
+ from the task instances dialog, while defining whether you want to includes the past/future
+ and the upstream/downstream dependencies. Note that a confirmation window comes next and
+ allows you to see the set you are about to clear. You can also clear all task instances
+ associated with the dag.
+* The CLI command ``airflow clear -h`` has lots of options when it comes to clearing task instance
+ states, including specifying date ranges, targeting task_ids by specifying a regular expression,
+ flags for including upstream and downstream relatives, and targeting task instances in specific
+ states (``failed``, or ``success``)
+* Clearing a task instance will no longer delete the task instance record. Instead it updates
+ max_tries and set the current task instance state to be None.
+* Marking task instances as successful can be done through the UI. This is mostly to fix false negatives,
+ or for instance when the fix has been applied outside of Airflow.
+* The ``airflow backfill`` CLI subcommand has a flag to ``--mark_success`` and allows selecting
+ subsections of the DAG as well as specifying date ranges.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 400c659..cf2734b 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -29,6 +29,7 @@ from airflow.jobs import BackfillJob
from airflow.models import DAG, TaskInstance as TI
from airflow.models import State as ST
from airflow.models import DagModel, DagStat
+from airflow.models import clear_task_instances
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
@@ -912,7 +913,7 @@ class TaskInstanceTest(unittest.TestCase):
# Clear the TI state since you can't run a task with a FAILED state without
# clearing it first
- ti.set_state(None, settings.Session())
+ dag.clear()
# third run -- up for retry
run_with_error(ti)
@@ -1154,3 +1155,137 @@ class TaskInstanceTest(unittest.TestCase):
with self.assertRaises(TestError):
ti.run()
+
+
+class ClearTasksTest(unittest.TestCase):
+ def test_clear_task_instances(self):
+ dag = DAG('test_clear_task_instances', start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10))
+ task0 = DummyOperator(task_id='0', owner='test', dag=dag)
+ task1 = DummyOperator(task_id='1', owner='test', dag=dag, retries=2)
+ ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
+ ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
+
+ ti0.run()
+ ti1.run()
+ session = settings.Session()
+ qry = session.query(TI).filter(
+ TI.dag_id == dag.dag_id).all()
+ clear_task_instances(qry, session, dag=dag)
+ session.commit()
+ ti0.refresh_from_db()
+ ti1.refresh_from_db()
+ self.assertEqual(ti0.try_number, 1)
+ self.assertEqual(ti0.max_tries, 1)
+ self.assertEqual(ti1.try_number, 1)
+ self.assertEqual(ti1.max_tries, 3)
+
+ def test_clear_task_instances_without_task(self):
+ dag = DAG('test_clear_task_instances_without_task', start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10))
+ task0 = DummyOperator(task_id='task0', owner='test', dag=dag)
+ task1 = DummyOperator(task_id='task1', owner='test', dag=dag, retries=2)
+ ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
+ ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
+ ti0.run()
+ ti1.run()
+
+ # Remove the task from dag.
+ dag.task_dict = {}
+ self.assertFalse(dag.has_task(task0.task_id))
+ self.assertFalse(dag.has_task(task1.task_id))
+
+ session = settings.Session()
+ qry = session.query(TI).filter(
+ TI.dag_id == dag.dag_id).all()
+ clear_task_instances(qry, session)
+ session.commit()
+ # When dag is None, max_tries will be maximum of original max_tries or try_number.
+ ti0.refresh_from_db()
+ ti1.refresh_from_db()
+ self.assertEqual(ti0.try_number, 1)
+ self.assertEqual(ti0.max_tries, 1)
+ self.assertEqual(ti1.try_number, 1)
+ self.assertEqual(ti1.max_tries, 2)
+
+ def test_clear_task_instances_without_dag(self):
+ dag = DAG('test_clear_task_instances_without_dag', start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10))
+ task0 = DummyOperator(task_id='task_0', owner='test', dag=dag)
+ task1 = DummyOperator(task_id='task_1', owner='test', dag=dag, retries=2)
+ ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
+ ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
+ ti0.run()
+ ti1.run()
+
+ session = settings.Session()
+ qry = session.query(TI).filter(
+ TI.dag_id == dag.dag_id).all()
+ clear_task_instances(qry, session)
+ session.commit()
+ # When dag is None, max_tries will be maximum of original max_tries or try_number.
+ ti0.refresh_from_db()
+ ti1.refresh_from_db()
+ self.assertEqual(ti0.try_number, 1)
+ self.assertEqual(ti0.max_tries, 1)
+ self.assertEqual(ti1.try_number, 1)
+ self.assertEqual(ti1.max_tries, 2)
+
+ def test_dag_clear(self):
+ dag = DAG('test_dag_clear', start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10))
+ task0 = DummyOperator(task_id='test_dag_clear_task_0', owner='test', dag=dag)
+ ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
+ self.assertEqual(ti0.try_number, 0)
+ ti0.run()
+ self.assertEqual(ti0.try_number, 1)
+ dag.clear()
+ ti0.refresh_from_db()
+ self.assertEqual(ti0.try_number, 1)
+ self.assertEqual(ti0.state, State.NONE)
+ self.assertEqual(ti0.max_tries, 1)
+
+ task1 = DummyOperator(task_id='test_dag_clear_task_1', owner='test',
+ dag=dag, retries=2)
+ ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
+ self.assertEqual(ti1.max_tries, 2)
+ ti1.try_number = 1
+ ti1.run()
+ self.assertEqual(ti1.try_number, 2)
+ self.assertEqual(ti1.max_tries, 2)
+
+ dag.clear()
+ ti0.refresh_from_db()
+ ti1.refresh_from_db()
+ # after clear dag, ti2 should show attempt 3 of 5
+ self.assertEqual(ti1.max_tries, 4)
+ self.assertEqual(ti1.try_number, 2)
+ # after clear dag, ti1 should show attempt 2 of 2
+ self.assertEqual(ti0.try_number, 1)
+ self.assertEqual(ti0.max_tries, 1)
+
+ def test_operator_clear(self):
+ dag = DAG('test_operator_clear', start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10))
+ t1 = DummyOperator(task_id='bash_op', owner='test', dag=dag)
+ t2 = DummyOperator(task_id='dummy_op', owner='test', dag=dag, retries=1)
+
+ t2.set_upstream(t1)
+
+ ti1 = TI(task=t1, execution_date=DEFAULT_DATE)
+ ti2 = TI(task=t2, execution_date=DEFAULT_DATE)
+ ti2.run()
+ # Dependency not met
+ self.assertEqual(ti2.try_number, 0)
+ self.assertEqual(ti2.max_tries, 1)
+
+ t2.clear(upstream=True)
+ ti1.run()
+ ti2.run()
+ self.assertEqual(ti1.try_number, 1)
+ # max_tries is 0 because there is no task instance in db for ti1
+ # so clear won't change the max_tries.
+ self.assertEqual(ti1.max_tries, 0)
+ self.assertEqual(ti2.try_number, 1)
+ # try_number (0) + retries(1)
+ self.assertEqual(ti2.max_tries, 1)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/tests/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/python_operator.py b/tests/operators/python_operator.py
index 71432af..74120fe 100644
--- a/tests/operators/python_operator.py
+++ b/tests/operators/python_operator.py
@@ -117,8 +117,8 @@ class BranchOperatorTest(unittest.TestCase):
if ti.task_id == 'make_choice':
self.assertEquals(ti.state, State.SUCCESS)
elif ti.task_id == 'branch_1':
- # should not exist
- raise
+ # should exist with state None
+ self.assertEquals(ti.state, State.NONE)
elif ti.task_id == 'branch_2':
self.assertEquals(ti.state, State.SKIPPED)
else:
@@ -147,34 +147,31 @@ class BranchOperatorTest(unittest.TestCase):
class ShortCircuitOperatorTest(unittest.TestCase):
- def setUp(self):
- self.dag = DAG('shortcircuit_operator_test',
- default_args={
- 'owner': 'airflow',
- 'start_date': DEFAULT_DATE},
- schedule_interval=INTERVAL)
- self.short_op = ShortCircuitOperator(task_id='make_choice',
- dag=self.dag,
- python_callable=lambda: self.value)
-
- self.branch_1 = DummyOperator(task_id='branch_1', dag=self.dag)
- self.branch_1.set_upstream(self.short_op)
- self.branch_2 = DummyOperator(task_id='branch_2', dag=self.dag)
- self.branch_2.set_upstream(self.branch_1)
- self.upstream = DummyOperator(task_id='upstream', dag=self.dag)
- self.upstream.set_downstream(self.short_op)
- self.dag.clear()
-
- self.value = True
-
def test_without_dag_run(self):
"""This checks the defensive against non existent tasks in a dag run"""
- self.value = False
- self.short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ value = False
+ dag = DAG('shortcircuit_operator_test_without_dag_run',
+ default_args={
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE
+ },
+ schedule_interval=INTERVAL)
+ short_op = ShortCircuitOperator(task_id='make_choice',
+ dag=dag,
+ python_callable=lambda: value)
+ branch_1 = DummyOperator(task_id='branch_1', dag=dag)
+ branch_1.set_upstream(short_op)
+ branch_2 = DummyOperator(task_id='branch_2', dag=dag)
+ branch_2.set_upstream(branch_1)
+ upstream = DummyOperator(task_id='upstream', dag=dag)
+ upstream.set_downstream(short_op)
+ dag.clear()
+
+ short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
session = Session()
tis = session.query(TI).filter(
- TI.dag_id == self.dag.dag_id,
+ TI.dag_id == dag.dag_id,
TI.execution_date == DEFAULT_DATE
)
@@ -189,10 +186,10 @@ class ShortCircuitOperatorTest(unittest.TestCase):
else:
raise
- self.value = True
- self.dag.clear()
+ value = True
+ dag.clear()
- self.short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
for ti in tis:
if ti.task_id == 'make_choice':
self.assertEquals(ti.state, State.SUCCESS)
@@ -207,17 +204,34 @@ class ShortCircuitOperatorTest(unittest.TestCase):
session.close()
def test_with_dag_run(self):
- self.value = False
- logging.error("Tasks {}".format(self.dag.tasks))
- dr = self.dag.create_dagrun(
+ value = False
+ dag = DAG('shortcircuit_operator_test_with_dag_run',
+ default_args={
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE
+ },
+ schedule_interval=INTERVAL)
+ short_op = ShortCircuitOperator(task_id='make_choice',
+ dag=dag,
+ python_callable=lambda: value)
+ branch_1 = DummyOperator(task_id='branch_1', dag=dag)
+ branch_1.set_upstream(short_op)
+ branch_2 = DummyOperator(task_id='branch_2', dag=dag)
+ branch_2.set_upstream(branch_1)
+ upstream = DummyOperator(task_id='upstream', dag=dag)
+ upstream.set_downstream(short_op)
+ dag.clear()
+
+ logging.error("Tasks {}".format(dag.tasks))
+ dr = dag.create_dagrun(
run_id="manual__",
start_date=datetime.datetime.now(),
execution_date=DEFAULT_DATE,
state=State.RUNNING
)
- self.upstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- self.short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ upstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
tis = dr.get_task_instances()
self.assertEqual(len(tis), 4)
@@ -231,11 +245,11 @@ class ShortCircuitOperatorTest(unittest.TestCase):
else:
raise
- self.value = True
- self.dag.clear()
+ value = True
+ dag.clear()
dr.verify_integrity()
- self.upstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- self.short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ upstream.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
tis = dr.get_task_instances()
self.assertEqual(len(tis), 4)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/tests/utils/test_dates.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_dates.py b/tests/utils/test_dates.py
index 56fae32..1323034 100644
--- a/tests/utils/test_dates.py
+++ b/tests/utils/test_dates.py
@@ -40,7 +40,3 @@ class Dates(unittest.TestCase):
self.assertTrue(
dates.days_ago(0, microsecond=3)
== today_midnight + timedelta(microseconds=3))
-
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b49986c3/tests/utils/test_logging.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py
new file mode 100644
index 0000000..474430f
--- /dev/null
+++ b/tests/utils/test_logging.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.exceptions import AirflowException
+from airflow.utils import logging as logging_utils
+from datetime import datetime, timedelta
+
+class Logging(unittest.TestCase):
+
+ def test_get_log_filename(self):
+ dag_id = 'dag_id'
+ task_id = 'task_id'
+ execution_date = datetime(2017, 1, 1, 0, 0, 0)
+ try_number = 0
+ filename = logging_utils.get_log_filename(dag_id, task_id, execution_date, try_number)
+ self.assertEqual(filename, 'dag_id/task_id/2017-01-01T00:00:00/1.log')