You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/22 19:16:35 UTC

[airflow] 24/31: Log traceback in trigger excs (#21213)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7477ad9efbceabde62237822115b6c8d62012360
Author: Malthe Borch <mb...@gmail.com>
AuthorDate: Mon Feb 28 22:41:39 2022 +0000

    Log traceback in trigger excs (#21213)
    
    (cherry picked from commit 4ad21f5f7c2d416cf813a860564bc2bf3e161d46)
---
 airflow/__init__.py              |  1 +
 airflow/jobs/triggerer_job.py    | 18 ++++++++++--------
 airflow/models/taskinstance.py   |  3 +++
 airflow/models/trigger.py        |  6 ++++--
 tests/jobs/test_triggerer_job.py | 11 ++++++++---
 5 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/airflow/__init__.py b/airflow/__init__.py
index 173adc5..3421220 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -26,6 +26,7 @@ in their PYTHONPATH. airflow_login should be based off the
 isort:skip_file
 """
 
+
 # flake8: noqa: F401
 
 import sys
diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index dff0e0f..bd145f7 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -168,8 +168,8 @@ class TriggererJob(BaseJob):
         """
         while self.runner.failed_triggers:
             # Tell the model to fail this trigger's deps
-            trigger_id = self.runner.failed_triggers.popleft()
-            Trigger.submit_failure(trigger_id=trigger_id)
+            trigger_id, saved_exc = self.runner.failed_triggers.popleft()
+            Trigger.submit_failure(trigger_id=trigger_id, exc=saved_exc)
             # Emit stat event
             Stats.incr('triggers.failed')
 
@@ -211,7 +211,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
     events: Deque[Tuple[int, TriggerEvent]]
 
     # Outbound queue of failed triggers
-    failed_triggers: Deque[int]
+    failed_triggers: Deque[Tuple[int, BaseException]]
 
     # Should-we-stop flag
     stop: bool = False
@@ -291,6 +291,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         for trigger_id, details in list(self.triggers.items()):  # pylint: disable=too-many-nested-blocks
             if details["task"].done():
                 # Check to see if it exited for good reasons
+                saved_exc = None
                 try:
                     result = details["task"].result()
                 except (asyncio.CancelledError, SystemExit, KeyboardInterrupt):
@@ -301,7 +302,8 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                     continue
                 except BaseException as e:
                     # This is potentially bad, so log it.
-                    self.log.error("Trigger %s exited with error %s", details["name"], e)
+                    self.log.exception("Trigger %s exited with error %s", details["name"], e)
+                    saved_exc = e
                 else:
                     # See if they foolishly returned a TriggerEvent
                     if isinstance(result, TriggerEvent):
@@ -315,7 +317,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                         "Trigger %s exited without sending an event. Dependent tasks will be failed.",
                         details["name"],
                     )
-                    self.failed_triggers.append(trigger_id)
+                    self.failed_triggers.append((trigger_id, saved_exc))
                 del self.triggers[trigger_id]
             await asyncio.sleep(0)
 
@@ -386,7 +388,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             running_trigger_ids.union(x[0] for x in self.events)
             .union(self.to_cancel)
             .union(x[0] for x in self.to_create)
-            .union(self.failed_triggers)
+            .union(trigger[0] for trigger in self.failed_triggers)
         )
         # Work out the two difference sets
         new_trigger_ids = requested_trigger_ids - known_trigger_ids
@@ -402,9 +404,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             # Resolve trigger record into an actual class instance
             try:
                 trigger_class = self.get_trigger_by_classpath(new_triggers[new_id].classpath)
-            except BaseException:
+            except BaseException as e:
                 # Either the trigger code or the path to it is bad. Fail the trigger.
-                self.failed_triggers.append(new_id)
+                self.failed_triggers.append((new_id, e))
                 continue
             self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
         # Enqueue orphaned triggers for cancellation
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2dcc923..f6440b4 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1486,6 +1486,9 @@ class TaskInstance(Base, LoggingMixin):
             # this task was scheduled specifically to fail.
             if self.next_method == "__fail__":
                 next_kwargs = self.next_kwargs or {}
+                traceback = self.next_kwargs.get("traceback")
+                if traceback is not None:
+                    self.log.error("Trigger failed:\n%s", "\n".join(traceback))
                 raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
             # Grab the callable off the Operator/Task and add in any kwargs
             execute_callable = getattr(task_copy, self.next_method)
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index aa0d2b1..d222fc7 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import datetime
+from traceback import format_exception
 from typing import Any, Dict, List, Optional
 
 from sqlalchemy import Column, Integer, String, func, or_
@@ -124,7 +125,7 @@ class Trigger(Base):
 
     @classmethod
     @provide_session
-    def submit_failure(cls, trigger_id, session=None):
+    def submit_failure(cls, trigger_id, exc=None, session=None):
         """
         Called when a trigger has failed unexpectedly, and we need to mark
         everything that depended on it as failed. Notably, we have to actually
@@ -144,8 +145,9 @@ class Trigger(Base):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the error and set the next_method to the fail state
+            traceback = format_exception(type(exc), exc, exc.__traceback__) if exc else None
             task_instance.next_method = "__fail__"
-            task_instance.next_kwargs = {"error": "Trigger failure"}
+            task_instance.next_kwargs = {"error": "Trigger failure", "traceback": traceback}
             # Remove ourselves as its trigger
             task_instance.trigger_id = None
             # Finally, mark it as scheduled so it gets re-queued
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 870116a..eee5ee2 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -390,7 +390,11 @@ def test_trigger_failing(session):
         # Wait for up to 3 seconds for it to fire and appear in the event queue
         for _ in range(30):
             if job.runner.failed_triggers:
-                assert list(job.runner.failed_triggers) == [1]
+                assert len(job.runner.failed_triggers) == 1
+                trigger_id, exc = list(job.runner.failed_triggers)[0]
+                assert trigger_id == 1
+                assert isinstance(exc, ValueError)
+                assert exc.args[0] == "Deliberate trigger failure"
                 break
             time.sleep(0.1)
         else:
@@ -448,7 +452,7 @@ def test_invalid_trigger(session, dag_maker):
     job.load_triggers()
 
     # Make sure it turned up in the failed queue
-    assert list(job.runner.failed_triggers) == [1]
+    assert len(job.runner.failed_triggers) == 1
 
     # Run the failed trigger handler
     job.handle_failed_triggers()
@@ -458,4 +462,5 @@ def test_invalid_trigger(session, dag_maker):
     task_instance.refresh_from_db()
     assert task_instance.state == TaskInstanceState.SCHEDULED
     assert task_instance.next_method == "__fail__"
-    assert task_instance.next_kwargs == {'error': 'Trigger failure'}
+    assert task_instance.next_kwargs['error'] == 'Trigger failure'
+    assert task_instance.next_kwargs['traceback'][-1] == "ModuleNotFoundError: No module named 'fake'\n"