You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/12/07 17:31:13 UTC
[1/2] incubator-airflow git commit: [AIRFLOW-1873] Set TI.try_number
to right value depending TI state
Repository: incubator-airflow
Updated Branches:
refs/heads/v1-9-stable 44adde47d -> 2205f498b
[AIRFLOW-1873] Set TI.try_number to right value depending TI state
Rather than having try_number+1 in various places,
try_number
will now automatically contain the right value for
when the TI
will next be run, and handle the case where
try_number is
accessed when the task is currently running.
This showed up as a bug where the logs from
running operators would
show up in the next log file (2.log for the first
try)
Closes #2832 from ashb/AIRFLOW-1873-task-operator-
log-try-number
(cherry picked from commit 4b4e504eeae81e48f3c9d796a61dd9e86000c663)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f205fae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f205fae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f205fae9
Branch: refs/heads/v1-9-stable
Commit: f205fae9abdba271c1eaecdf1c9db950154a8199
Parents: 44adde4
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Thu Dec 7 13:31:38 2017 +0000
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Dec 7 09:30:22 2017 -0800
----------------------------------------------------------------------
airflow/models.py | 44 ++++++++++----
airflow/utils/log/file_task_handler.py | 8 +--
airflow/utils/log/gcs_task_handler.py | 4 +-
airflow/utils/log/s3_task_handler.py | 4 +-
tests/jobs.py | 3 +-
tests/models.py | 89 +++++++++++++++++++----------
tests/utils/test_log_handlers.py | 52 ++++++++++++++---
7 files changed, 144 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f205fae9/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 76f879c..e979b07 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -132,13 +132,13 @@ def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
if dag and dag.has_task(task_id):
task = dag.get_task(task_id)
task_retries = task.retries
- ti.max_tries = ti.try_number + task_retries
+ ti.max_tries = ti.try_number + task_retries - 1
else:
# Ignore errors when updating max_tries if dag is None or
# task not found in dag since database records could be
# outdated. We make max_tries the maximum value of its
# original max_tries or the current task try number.
- ti.max_tries = max(ti.max_tries, ti.try_number)
+ ti.max_tries = max(ti.max_tries, ti.try_number - 1)
ti.state = State.NONE
session.merge(ti)
@@ -769,7 +769,7 @@ class TaskInstance(Base, LoggingMixin):
end_date = Column(DateTime)
duration = Column(Float)
state = Column(String(20))
- try_number = Column(Integer, default=0)
+ _try_number = Column('try_number', Integer, default=0)
max_tries = Column(Integer)
hostname = Column(String(1000))
unixname = Column(String(1000))
@@ -811,6 +811,24 @@ class TaskInstance(Base, LoggingMixin):
""" Initialize the attributes that aren't stored in the DB. """
self.test_mode = False # can be changed when calling 'run'
+ @property
+ def try_number(self):
+ """
+ Return the try number that this task number will be when it is acutally
+ run.
+
+ If the TI is currently running, this will match the column in the
+ databse, in all othercases this will be incremenetd
+ """
+ # This is designed so that task logs end up in the right file.
+ if self.state == State.RUNNING:
+ return self._try_number
+ return self._try_number + 1
+
+ @try_number.setter
+ def try_number(self, value):
+ self._try_number = value
+
def command(
self,
mark_success=False,
@@ -1039,7 +1057,9 @@ class TaskInstance(Base, LoggingMixin):
self.state = ti.state
self.start_date = ti.start_date
self.end_date = ti.end_date
- self.try_number = ti.try_number
+ # Get the raw value of try_number column, don't read through the
+ # accessor here otherwise it will be incremeneted by one already.
+ self.try_number = ti._try_number
self.max_tries = ti.max_tries
self.hostname = ti.hostname
self.pid = ti.pid
@@ -1339,7 +1359,7 @@ class TaskInstance(Base, LoggingMixin):
# not 0-indexed lists (i.e. Attempt 1 instead of
# Attempt 0 for the first attempt).
msg = "Starting attempt {attempt} of {total}".format(
- attempt=self.try_number + 1,
+ attempt=self.try_number,
total=self.max_tries + 1)
self.start_date = datetime.utcnow()
@@ -1361,7 +1381,7 @@ class TaskInstance(Base, LoggingMixin):
self.state = State.NONE
msg = ("FIXME: Rescheduling due to concurrency limits reached at task "
"runtime. Attempt {attempt} of {total}. State set to NONE.").format(
- attempt=self.try_number + 1,
+ attempt=self.try_number,
total=self.max_tries + 1)
self.log.warning(hr + msg + hr)
@@ -1381,7 +1401,7 @@ class TaskInstance(Base, LoggingMixin):
# print status message
self.log.info(hr + msg + hr)
- self.try_number += 1
+ self._try_number += 1
if not test_mode:
session.add(Log(State.RUNNING, self))
@@ -1583,10 +1603,10 @@ class TaskInstance(Base, LoggingMixin):
# Let's go deeper
try:
- # try_number is incremented by 1 during task instance run. So the
- # current task instance try_number is the try_number for the next
- # task instance run. We only mark task instance as FAILED if the
- # next task instance try_number exceeds the max_tries.
+ # Since this function is called only when the TI state is running,
+ # try_number contains the current try_number (not the next). We
+ # only mark task instance as FAILED if the next task instance
+ # try_number exceeds the max_tries.
if task.retries and self.try_number <= self.max_tries:
self.state = State.UP_FOR_RETRY
self.log.info('Marking task as UP_FOR_RETRY')
@@ -1751,7 +1771,7 @@ class TaskInstance(Base, LoggingMixin):
"Host: {self.hostname}<br>"
"Log file: {self.log_filepath}<br>"
"Mark success: <a href='{self.mark_success_url}'>Link</a><br>"
- ).format(try_number=self.try_number + 1, max_tries=self.max_tries + 1, **locals())
+ ).format(try_number=self.try_number, max_tries=self.max_tries + 1, **locals())
send_email(task.email, title, body)
def set_duration(self):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f205fae9/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6038fbf..9a8061a 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -89,7 +89,7 @@ class FileTaskHandler(logging.Handler):
# Task instance here might be different from task instance when
# initializing the handler. Thus explicitly getting log location
# is needed to get correct log path.
- log_relative_path = self._render_filename(ti, try_number + 1)
+ log_relative_path = self._render_filename(ti, try_number)
location = os.path.join(self.local_base, log_relative_path)
log = ""
@@ -142,8 +142,8 @@ class FileTaskHandler(logging.Handler):
next_try = task_instance.try_number
if try_number is None:
- try_numbers = list(range(next_try))
- elif try_number < 0:
+ try_numbers = list(range(1, next_try))
+ elif try_number < 1:
logs = ['Error fetching the logs. Try number {} is invalid.'.format(try_number)]
return logs
else:
@@ -174,7 +174,7 @@ class FileTaskHandler(logging.Handler):
# writable by both users, then it's possible that re-running a task
# via the UI (or vice versa) results in a permission error as the task
# tries to write to a log file created by the other user.
- relative_path = self._render_filename(ti, ti.try_number + 1)
+ relative_path = self._render_filename(ti, ti.try_number)
full_path = os.path.join(self.local_base, relative_path)
directory = os.path.dirname(full_path)
# Create the log file and give it group writable permissions
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f205fae9/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index c11e7ad..1520347 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -58,7 +58,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
# Log relative path is used to construct local and remote
# log path to upload log files into GCS and read from the
# remote location.
- self.log_relative_path = self._render_filename(ti, ti.try_number + 1)
+ self.log_relative_path = self._render_filename(ti, ti.try_number)
def close(self):
"""
@@ -94,7 +94,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
# Explicitly getting log relative path is necessary as the given
# task instance might be different than task instance passed in
# in set_context method.
- log_relative_path = self._render_filename(ti, try_number + 1)
+ log_relative_path = self._render_filename(ti, try_number)
remote_loc = os.path.join(self.remote_base, log_relative_path)
if self.gcs_log_exists(remote_loc):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f205fae9/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index cfa966a..5ff90c6 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -53,7 +53,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
super(S3TaskHandler, self).set_context(ti)
# Local location and remote location is needed to open and
# upload local log file to S3 remote storage.
- self.log_relative_path = self._render_filename(ti, ti.try_number + 1)
+ self.log_relative_path = self._render_filename(ti, ti.try_number)
def close(self):
"""
@@ -89,7 +89,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
# Explicitly getting log relative path is necessary as the given
# task instance might be different than task instance passed in
# in set_context method.
- log_relative_path = self._render_filename(ti, try_number + 1)
+ log_relative_path = self._render_filename(ti, try_number)
remote_loc = os.path.join(self.remote_base, log_relative_path)
if self.s3_log_exists(remote_loc):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f205fae9/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e8fff7e..9d2f363 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -2352,10 +2352,11 @@ class SchedulerJobTest(unittest.TestCase):
(command, priority, queue, ti) = ti_tuple
ti.task = dag_task1
+ self.assertEqual(ti.try_number, 1)
# fail execution
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
- self.assertEqual(ti.try_number, 1)
+ self.assertEqual(ti.try_number, 2)
ti.refresh_from_db(lock_for_update=True, session=session)
ti.state = State.SCHEDULED
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f205fae9/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index a1de17d..88be50e 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -923,10 +923,11 @@ class TaskInstanceTest(unittest.TestCase):
ti = TI(
task=task, execution_date=datetime.datetime.now())
+ self.assertEqual(ti.try_number, 1)
# first run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
- self.assertEqual(ti.try_number, 1)
+ self.assertEqual(ti.try_number, 2)
# second run -- still up for retry because retry_delay hasn't expired
run_with_error(ti)
@@ -963,16 +964,19 @@ class TaskInstanceTest(unittest.TestCase):
ti = TI(
task=task, execution_date=datetime.datetime.now())
+ self.assertEqual(ti.try_number, 1)
# first run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
- self.assertEqual(ti.try_number, 1)
+ self.assertEqual(ti._try_number, 1)
+ self.assertEqual(ti.try_number, 2)
# second run -- fail
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)
- self.assertEqual(ti.try_number, 2)
+ self.assertEqual(ti._try_number, 2)
+ self.assertEqual(ti.try_number, 3)
# Clear the TI state since you can't run a task with a FAILED state without
# clearing it first
@@ -981,12 +985,15 @@ class TaskInstanceTest(unittest.TestCase):
# third run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
- self.assertEqual(ti.try_number, 3)
+ self.assertEqual(ti._try_number, 3)
+ self.assertEqual(ti.try_number, 4)
# fourth run -- fail
run_with_error(ti)
+ ti.refresh_from_db()
self.assertEqual(ti.state, State.FAILED)
- self.assertEqual(ti.try_number, 4)
+ self.assertEqual(ti._try_number, 4)
+ self.assertEqual(ti.try_number, 5)
def test_next_retry_datetime(self):
delay = datetime.timedelta(seconds=30)
@@ -1007,17 +1014,16 @@ class TaskInstanceTest(unittest.TestCase):
task=task, execution_date=DEFAULT_DATE)
ti.end_date = datetime.datetime.now()
- ti.try_number = 1
dt = ti.next_retry_datetime()
# between 30 * 2^0.5 and 30 * 2^1 (15 and 30)
self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0))
- ti.try_number = 4
+ ti.try_number = 3
dt = ti.next_retry_datetime()
# between 30 * 2^2 and 30 * 2^3 (120 and 240)
self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0))
- ti.try_number = 6
+ ti.try_number = 5
dt = ti.next_retry_datetime()
# between 30 * 2^4 and 30 * 2^5 (480 and 960)
self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0))
@@ -1224,7 +1230,11 @@ class TaskInstanceTest(unittest.TestCase):
task = DummyOperator(task_id='task', dag=dag, start_date=DEFAULT_DATE)
ti = TI(
task=task, execution_date=datetime.datetime.now())
+ self.assertEqual(ti._try_number, 0)
self.assertTrue(ti._check_and_change_state_before_execution())
+ # State should be running, and try_number column should be incremented
+ self.assertEqual(ti.state, State.RUNNING)
+ self.assertEqual(ti._try_number, 1)
def test_check_and_change_state_before_execution_dep_not_met(self):
dag = models.DAG(dag_id='test_check_and_change_state_before_execution')
@@ -1235,6 +1245,20 @@ class TaskInstanceTest(unittest.TestCase):
task=task2, execution_date=datetime.datetime.now())
self.assertFalse(ti._check_and_change_state_before_execution())
+ def test_try_number(self):
+ """
+ Test the try_number accessor behaves in various running states
+ """
+ dag = models.DAG(dag_id='test_check_and_change_state_before_execution')
+ task = DummyOperator(task_id='task', dag=dag, start_date=DEFAULT_DATE)
+ ti = TI(task=task, execution_date=datetime.datetime.utcnow())
+ self.assertEqual(1, ti.try_number)
+ ti.try_number = 2
+ ti.state = State.RUNNING
+ self.assertEqual(2, ti.try_number)
+ ti.state = State.SUCCESS
+ self.assertEqual(3, ti.try_number)
+
def test_get_num_running_task_instances(self):
session = settings.Session()
@@ -1257,7 +1281,7 @@ class TaskInstanceTest(unittest.TestCase):
self.assertEquals(1, ti1.get_num_running_task_instances(session=session))
self.assertEquals(1, ti2.get_num_running_task_instances(session=session))
self.assertEquals(1, ti3.get_num_running_task_instances(session=session))
-
+
class ClearTasksTest(unittest.TestCase):
def test_clear_task_instances(self):
@@ -1277,9 +1301,10 @@ class ClearTasksTest(unittest.TestCase):
session.commit()
ti0.refresh_from_db()
ti1.refresh_from_db()
- self.assertEqual(ti0.try_number, 1)
+ # Next try to run will be try 2
+ self.assertEqual(ti0.try_number, 2)
self.assertEqual(ti0.max_tries, 1)
- self.assertEqual(ti1.try_number, 1)
+ self.assertEqual(ti1.try_number, 2)
self.assertEqual(ti1.max_tries, 3)
def test_clear_task_instances_without_task(self):
@@ -1305,9 +1330,10 @@ class ClearTasksTest(unittest.TestCase):
# When dag is None, max_tries will be maximum of original max_tries or try_number.
ti0.refresh_from_db()
ti1.refresh_from_db()
- self.assertEqual(ti0.try_number, 1)
+ # Next try to run will be try 2
+ self.assertEqual(ti0.try_number, 2)
self.assertEqual(ti0.max_tries, 1)
- self.assertEqual(ti1.try_number, 1)
+ self.assertEqual(ti1.try_number, 2)
self.assertEqual(ti1.max_tries, 2)
def test_clear_task_instances_without_dag(self):
@@ -1328,9 +1354,10 @@ class ClearTasksTest(unittest.TestCase):
# When dag is None, max_tries will be maximum of original max_tries or try_number.
ti0.refresh_from_db()
ti1.refresh_from_db()
- self.assertEqual(ti0.try_number, 1)
+ # Next try to run will be try 2
+ self.assertEqual(ti0.try_number, 2)
self.assertEqual(ti0.max_tries, 1)
- self.assertEqual(ti1.try_number, 1)
+ self.assertEqual(ti1.try_number, 2)
self.assertEqual(ti1.max_tries, 2)
def test_dag_clear(self):
@@ -1338,12 +1365,13 @@ class ClearTasksTest(unittest.TestCase):
end_date=DEFAULT_DATE + datetime.timedelta(days=10))
task0 = DummyOperator(task_id='test_dag_clear_task_0', owner='test', dag=dag)
ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
- self.assertEqual(ti0.try_number, 0)
- ti0.run()
+ # Next try to run will be try 1
self.assertEqual(ti0.try_number, 1)
+ ti0.run()
+ self.assertEqual(ti0.try_number, 2)
dag.clear()
ti0.refresh_from_db()
- self.assertEqual(ti0.try_number, 1)
+ self.assertEqual(ti0.try_number, 2)
self.assertEqual(ti0.state, State.NONE)
self.assertEqual(ti0.max_tries, 1)
@@ -1352,8 +1380,9 @@ class ClearTasksTest(unittest.TestCase):
ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
self.assertEqual(ti1.max_tries, 2)
ti1.try_number = 1
+ # Next try will be 2
ti1.run()
- self.assertEqual(ti1.try_number, 2)
+ self.assertEqual(ti1.try_number, 3)
self.assertEqual(ti1.max_tries, 2)
dag.clear()
@@ -1361,9 +1390,9 @@ class ClearTasksTest(unittest.TestCase):
ti1.refresh_from_db()
# after clear dag, ti2 should show attempt 3 of 5
self.assertEqual(ti1.max_tries, 4)
- self.assertEqual(ti1.try_number, 2)
+ self.assertEqual(ti1.try_number, 3)
# after clear dag, ti1 should show attempt 2 of 2
- self.assertEqual(ti0.try_number, 1)
+ self.assertEqual(ti0.try_number, 2)
self.assertEqual(ti0.max_tries, 1)
def test_dags_clear(self):
@@ -1383,7 +1412,7 @@ class ClearTasksTest(unittest.TestCase):
for i in range(num_of_dags):
tis[i].run()
self.assertEqual(tis[i].state, State.SUCCESS)
- self.assertEqual(tis[i].try_number, 1)
+ self.assertEqual(tis[i].try_number, 2)
self.assertEqual(tis[i].max_tries, 0)
DAG.clear_dags(dags)
@@ -1391,14 +1420,14 @@ class ClearTasksTest(unittest.TestCase):
for i in range(num_of_dags):
tis[i].refresh_from_db()
self.assertEqual(tis[i].state, State.NONE)
- self.assertEqual(tis[i].try_number, 1)
+ self.assertEqual(tis[i].try_number, 2)
self.assertEqual(tis[i].max_tries, 1)
# test dry_run
for i in range(num_of_dags):
tis[i].run()
self.assertEqual(tis[i].state, State.SUCCESS)
- self.assertEqual(tis[i].try_number, 2)
+ self.assertEqual(tis[i].try_number, 3)
self.assertEqual(tis[i].max_tries, 1)
DAG.clear_dags(dags, dry_run=True)
@@ -1406,7 +1435,7 @@ class ClearTasksTest(unittest.TestCase):
for i in range(num_of_dags):
tis[i].refresh_from_db()
self.assertEqual(tis[i].state, State.SUCCESS)
- self.assertEqual(tis[i].try_number, 2)
+ self.assertEqual(tis[i].try_number, 3)
self.assertEqual(tis[i].max_tries, 1)
# test only_failed
@@ -1422,11 +1451,11 @@ class ClearTasksTest(unittest.TestCase):
tis[i].refresh_from_db()
if i != failed_dag_idx:
self.assertEqual(tis[i].state, State.SUCCESS)
- self.assertEqual(tis[i].try_number, 2)
+ self.assertEqual(tis[i].try_number, 3)
self.assertEqual(tis[i].max_tries, 1)
else:
self.assertEqual(tis[i].state, State.NONE)
- self.assertEqual(tis[i].try_number, 2)
+ self.assertEqual(tis[i].try_number, 3)
self.assertEqual(tis[i].max_tries, 2)
def test_operator_clear(self):
@@ -1441,17 +1470,17 @@ class ClearTasksTest(unittest.TestCase):
ti2 = TI(task=t2, execution_date=DEFAULT_DATE)
ti2.run()
# Dependency not met
- self.assertEqual(ti2.try_number, 0)
+ self.assertEqual(ti2.try_number, 1)
self.assertEqual(ti2.max_tries, 1)
t2.clear(upstream=True)
ti1.run()
ti2.run()
- self.assertEqual(ti1.try_number, 1)
+ self.assertEqual(ti1.try_number, 2)
# max_tries is 0 because there is no task instance in db for ti1
# so clear won't change the max_tries.
self.assertEqual(ti1.max_tries, 0)
- self.assertEqual(ti2.try_number, 1)
+ self.assertEqual(ti2.try_number, 2)
# try_number (0) + retries(1)
self.assertEqual(ti2.max_tries, 1)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f205fae9/tests/utils/test_log_handlers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 25faa7c..54e8cff 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -12,17 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import copy
import logging
import logging.config
-import mock
import os
import unittest
+import six
from datetime import datetime
-from airflow.models import TaskInstance, DAG
+from airflow.models import TaskInstance, DAG, DagRun
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.operators.dummy_operator import DummyOperator
+from airflow.settings import Session
+from airflow.utils.log.logging_mixin import set_context
from airflow.utils.log.file_task_handler import FileTaskHandler
DEFAULT_DATE = datetime(2016, 1, 1)
@@ -31,11 +32,22 @@ FILE_TASK_HANDLER = 'file.task'
class TestFileTaskLogHandler(unittest.TestCase):
+ def cleanUp(self):
+ session = Session()
+
+ session.query(DagRun).delete()
+ session.query(TaskInstance).delete()
def setUp(self):
super(TestFileTaskLogHandler, self).setUp()
- # We use file task handler by default.
logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+ logging.root.disabled = False
+ self.cleanUp()
+ # We use file task handler by default.
+
+ def tearDown(self):
+ self.cleanUp()
+ super(TestFileTaskLogHandler, self).tearDown()
def test_default_task_logging_setup(self):
# file task handler is used by default.
@@ -46,29 +58,51 @@ class TestFileTaskLogHandler(unittest.TestCase):
self.assertEqual(handler.name, FILE_TASK_HANDLER)
def test_file_task_handler(self):
+ def task_callable(ti, **kwargs):
+ ti.log.info("test")
dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE)
- task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=dag)
+ task = PythonOperator(
+ task_id='task_for_testing_file_log_handler',
+ dag=dag,
+ python_callable=task_callable,
+ provide_context=True
+ )
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
- logger = logging.getLogger(TASK_LOGGER)
+ logger = ti.log
+ ti.log.disabled = False
+
file_handler = next((handler for handler in logger.handlers
if handler.name == FILE_TASK_HANDLER), None)
self.assertIsNotNone(file_handler)
- file_handler.set_context(ti)
+ set_context(logger, ti)
self.assertIsNotNone(file_handler.handler)
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
self.assertTrue(os.path.isfile(log_filename))
+ self.assertTrue(log_filename.endswith("1.log"), log_filename)
+
+ ti.run(ignore_ti_state=True)
- logger.info("test")
- ti.run()
+ file_handler.flush()
+ file_handler.close()
self.assertTrue(hasattr(file_handler, 'read'))
# Return value of read must be a list.
logs = file_handler.read(ti)
self.assertTrue(isinstance(logs, list))
self.assertEqual(len(logs), 1)
+ target_re = r'\n\[[^\]]+\] {test_log_handlers.py:\d+} INFO - test\n'
+
+ # We should expect our log line from the callable above to appear in
+ # the logs we read back
+ six.assertRegex(
+ self,
+ logs[0],
+ target_re,
+ "Logs were " + str(logs)
+ )
# Remove the generated tmp log file.
os.remove(log_filename)
[2/2] incubator-airflow git commit: [AIRFLOW-XXX] Bump version to
1.9.0rc5
Posted by cr...@apache.org.
[AIRFLOW-XXX] Bump version to 1.9.0rc5
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2205f498
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2205f498
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2205f498
Branch: refs/heads/v1-9-stable
Commit: 2205f498b7a9f7775a70c41ee7ad8052cf346894
Parents: f205fae
Author: Chris Riccomini <cr...@apache.org>
Authored: Thu Dec 7 09:30:50 2017 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Dec 7 09:30:50 2017 -0800
----------------------------------------------------------------------
airflow/version.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2205f498/airflow/version.py
----------------------------------------------------------------------
diff --git a/airflow/version.py b/airflow/version.py
index db66eb8..b490558 100644
--- a/airflow/version.py
+++ b/airflow/version.py
@@ -13,4 +13,4 @@
# limitations under the License.
#
-version = '1.9.0rc4'
+version = '1.9.0rc5'