You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/22 17:51:39 UTC
[airflow] 24/30: Log traceback in trigger excs (#21213)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b78760e186621133d1e6568335a4f9c18ef92d87
Author: Malthe Borch <mb...@gmail.com>
AuthorDate: Mon Feb 28 22:41:39 2022 +0000
Log traceback in trigger excs (#21213)
(cherry picked from commit 4ad21f5f7c2d416cf813a860564bc2bf3e161d46)
---
airflow/__init__.py | 1 +
airflow/jobs/triggerer_job.py | 18 ++++++++++--------
airflow/models/taskinstance.py | 3 +++
airflow/models/trigger.py | 6 ++++--
tests/jobs/test_triggerer_job.py | 11 ++++++++---
5 files changed, 26 insertions(+), 13 deletions(-)
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 173adc5..3421220 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -26,6 +26,7 @@ in their PYTHONPATH. airflow_login should be based off the
isort:skip_file
"""
+
# flake8: noqa: F401
import sys
diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index dff0e0f..bd145f7 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -168,8 +168,8 @@ class TriggererJob(BaseJob):
"""
while self.runner.failed_triggers:
# Tell the model to fail this trigger's deps
- trigger_id = self.runner.failed_triggers.popleft()
- Trigger.submit_failure(trigger_id=trigger_id)
+ trigger_id, saved_exc = self.runner.failed_triggers.popleft()
+ Trigger.submit_failure(trigger_id=trigger_id, exc=saved_exc)
# Emit stat event
Stats.incr('triggers.failed')
@@ -211,7 +211,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
events: Deque[Tuple[int, TriggerEvent]]
# Outbound queue of failed triggers
- failed_triggers: Deque[int]
+ failed_triggers: Deque[Tuple[int, BaseException]]
# Should-we-stop flag
stop: bool = False
@@ -291,6 +291,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
for trigger_id, details in list(self.triggers.items()): # pylint: disable=too-many-nested-blocks
if details["task"].done():
# Check to see if it exited for good reasons
+ saved_exc = None
try:
result = details["task"].result()
except (asyncio.CancelledError, SystemExit, KeyboardInterrupt):
@@ -301,7 +302,8 @@ class TriggerRunner(threading.Thread, LoggingMixin):
continue
except BaseException as e:
# This is potentially bad, so log it.
- self.log.error("Trigger %s exited with error %s", details["name"], e)
+ self.log.exception("Trigger %s exited with error %s", details["name"], e)
+ saved_exc = e
else:
# See if they foolishly returned a TriggerEvent
if isinstance(result, TriggerEvent):
@@ -315,7 +317,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
"Trigger %s exited without sending an event. Dependent tasks will be failed.",
details["name"],
)
- self.failed_triggers.append(trigger_id)
+ self.failed_triggers.append((trigger_id, saved_exc))
del self.triggers[trigger_id]
await asyncio.sleep(0)
@@ -386,7 +388,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
running_trigger_ids.union(x[0] for x in self.events)
.union(self.to_cancel)
.union(x[0] for x in self.to_create)
- .union(self.failed_triggers)
+ .union(trigger[0] for trigger in self.failed_triggers)
)
# Work out the two difference sets
new_trigger_ids = requested_trigger_ids - known_trigger_ids
@@ -402,9 +404,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
# Resolve trigger record into an actual class instance
try:
trigger_class = self.get_trigger_by_classpath(new_triggers[new_id].classpath)
- except BaseException:
+ except BaseException as e:
# Either the trigger code or the path to it is bad. Fail the trigger.
- self.failed_triggers.append(new_id)
+ self.failed_triggers.append((new_id, e))
continue
self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
# Enqueue orphaned triggers for cancellation
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2dcc923..f6440b4 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1486,6 +1486,9 @@ class TaskInstance(Base, LoggingMixin):
# this task was scheduled specifically to fail.
if self.next_method == "__fail__":
next_kwargs = self.next_kwargs or {}
+ traceback = self.next_kwargs.get("traceback")
+ if traceback is not None:
+ self.log.error("Trigger failed:\n%s", "\n".join(traceback))
raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
# Grab the callable off the Operator/Task and add in any kwargs
execute_callable = getattr(task_copy, self.next_method)
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index aa0d2b1..d222fc7 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import datetime
+from traceback import format_exception
from typing import Any, Dict, List, Optional
from sqlalchemy import Column, Integer, String, func, or_
@@ -124,7 +125,7 @@ class Trigger(Base):
@classmethod
@provide_session
- def submit_failure(cls, trigger_id, session=None):
+ def submit_failure(cls, trigger_id, exc=None, session=None):
"""
Called when a trigger has failed unexpectedly, and we need to mark
everything that depended on it as failed. Notably, we have to actually
@@ -144,8 +145,9 @@ class Trigger(Base):
TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
):
# Add the error and set the next_method to the fail state
+ traceback = format_exception(type(exc), exc, exc.__traceback__) if exc else None
task_instance.next_method = "__fail__"
- task_instance.next_kwargs = {"error": "Trigger failure"}
+ task_instance.next_kwargs = {"error": "Trigger failure", "traceback": traceback}
# Remove ourselves as its trigger
task_instance.trigger_id = None
# Finally, mark it as scheduled so it gets re-queued
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 870116a..eee5ee2 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -390,7 +390,11 @@ def test_trigger_failing(session):
# Wait for up to 3 seconds for it to fire and appear in the event queue
for _ in range(30):
if job.runner.failed_triggers:
- assert list(job.runner.failed_triggers) == [1]
+ assert len(job.runner.failed_triggers) == 1
+ trigger_id, exc = list(job.runner.failed_triggers)[0]
+ assert trigger_id == 1
+ assert isinstance(exc, ValueError)
+ assert exc.args[0] == "Deliberate trigger failure"
break
time.sleep(0.1)
else:
@@ -448,7 +452,7 @@ def test_invalid_trigger(session, dag_maker):
job.load_triggers()
# Make sure it turned up in the failed queue
- assert list(job.runner.failed_triggers) == [1]
+ assert len(job.runner.failed_triggers) == 1
# Run the failed trigger handler
job.handle_failed_triggers()
@@ -458,4 +462,5 @@ def test_invalid_trigger(session, dag_maker):
task_instance.refresh_from_db()
assert task_instance.state == TaskInstanceState.SCHEDULED
assert task_instance.next_method == "__fail__"
- assert task_instance.next_kwargs == {'error': 'Trigger failure'}
+ assert task_instance.next_kwargs['error'] == 'Trigger failure'
+ assert task_instance.next_kwargs['traceback'][-1] == "ModuleNotFoundError: No module named 'fake'\n"