You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/03 19:30:38 UTC

[GitHub] [airflow] c-wilson commented on a change in pull request #7740: [AIRFLOW-7065] Add optional propagation of task std streams to console

c-wilson commented on a change in pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#discussion_r419148387



##########
File path: airflow/utils/log/logging_mixin.py
##########
@@ -118,6 +129,53 @@ def isatty(self):
         """
         return False
 
+    def add_stream_target(self, target: IOBase):
+        """
+        Adds a stream target to propagate messages to in addition to the provided logger.
+        :param target: File like to write to.
+        :return:
+        """
+        if not hasattr(target, 'write'):
+            raise TypeError('Stream target must be writeable.')
+
+        self._additional_stream_targets.append(target)
+
+
+class _StreamToLogRedirector(AbstractContextManager):
+    """Context manager to redirect console stream to a StreamLogWriter"""
+    stream_to_replace: str
+    _existing_stream_target: List[IOBase]
+
+    def __init__(self, logger: logging.Logger, level: int, propagate_to_existing_stream: bool = False):
+        self.propagate_to_existing_stream = propagate_to_existing_stream
+        self._existing_stream_target = []
+        self._replacement_stream = StreamLogWriter(logger, level)
+
+    def __enter__(self):
+        """Saves existing stream target and replaces it will this instance."""
+        existing_stream = getattr(sys, self.stream_to_replace)
+        self._existing_stream_target.append(existing_stream)
+
+        if self.propagate_to_existing_stream:
+            self._replacement_stream.add_stream_target(existing_stream)
+
+        setattr(sys, self.stream_to_replace, self._replacement_stream)
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """Puts back existing stream target"""
+        self._replacement_stream.flush()
+        setattr(sys, self.stream_to_replace, self._existing_stream_target.pop())
+
+
+class StdoutToLog(_StreamToLogRedirector):

Review comment:
       It's possible, but I don't know if the result would be cleaner. Maybe you're right. This just serves as a compact way to pass the right stream to the StreamLogHandler when propagation is desired. This _could_ live in the task_command.py as if statements if we want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org