You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "josh-fell (via GitHub)" <gi...@apache.org> on 2023/02/16 18:49:02 UTC

[GitHub] [airflow] josh-fell commented on a diff in pull request #29503: Get rid of state in Apache Beam provider hook

josh-fell commented on code in PR #29503:
URL: https://github.com/apache/airflow/pull/29503#discussion_r1108895095


##########
airflow/providers/apache/beam/hooks/beam.py:
##########
@@ -81,81 +81,85 @@ def beam_options_to_args(options: dict) -> list[str]:
     return args
 
 
-class BeamCommandRunner(LoggingMixin):
+def process_fd(
+    proc,
+    fd,
+    log: logging.Logger,
+    process_line_callback: Callable[[str], None] | None = None,
+):
+    """
+    Prints output to logs.
+
+    :param proc: subprocess.
+    :param fd: File descriptor.
+    :param process_line_callback: Optional callback which can be used to process
+        stdout and stderr to detect job id.
+    :param log: logger.
+    """
+    if fd not in (proc.stdout, proc.stderr):
+        raise Exception("No data in stderr or in stdout.")
+
+    fd_to_log = {proc.stderr: log.warning, proc.stdout: log.info}
+    func_log = fd_to_log[fd]
+
+    while True:
+        line = fd.readline().decode()
+        if not line:
+            return
+        if process_line_callback:
+            process_line_callback(line)
+        func_log(line.rstrip("\n"))
+
+
+def run_beam_command(
+    cmd: list[str],
+    log: logging.Logger,
+    process_line_callback: Callable[[str], None] | None = None,
+    working_directory: str | None = None,
+) -> None:
     """
     Class responsible for running pipeline command in subprocess

Review Comment:
   ```suggestion
       Function responsible for running pipeline command in subprocess.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org