You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:20:19 UTC

[GitHub] stale[bot] closed pull request #2026: [AIRFLOW-811] [BugFix] bash_operator does not return full output

stale[bot] closed pull request #2026: [AIRFLOW-811] [BugFix] bash_operator does not return full output
URL: https://github.com/apache/incubator-airflow/pull/2026
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index ff2ed51b96..ebba0ee07e 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -13,11 +13,15 @@
 # limitations under the License.
 
 
-from builtins import bytes
+import logging
+import mmap
 import os
+import re
 import signal
+import io
 from subprocess import Popen, STDOUT, PIPE
 from tempfile import gettempdir, NamedTemporaryFile
+from builtins import bytes
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -52,60 +56,90 @@ def __init__(
             bash_command,
             xcom_push=False,
             env=None,
+            log_outout=True,
             output_encoding='utf-8',
+            output_regex_filter=None,
             *args, **kwargs):
 
         super(BashOperator, self).__init__(*args, **kwargs)
         self.bash_command = bash_command
         self.env = env
-        self.xcom_push_flag = xcom_push
+        self.xcom_push = xcom_push
+        self.log_outout = log_outout
         self.output_encoding = output_encoding
+        self.output_regex_filter = output_regex_filter
+        self.sp = None
 
     def execute(self, context):
         """
         Execute the bash command in a temporary directory
         which will be cleaned afterwards
         """
-        bash_command = self.bash_command
-        self.log.info("Tmp dir root location: \n %s", gettempdir())
         with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
-            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
-
-                f.write(bytes(bash_command, 'utf_8'))
-                f.flush()
-                fname = f.name
+            self.log.info("Tmp dir root location: {0}".format(tmp_dir))
+            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as cmd_file, \
+                    NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as stdout_file, \
+                    NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as stderr_file:
+
+                cmd_file.write(bytes(self.bash_command, 'utf_8'))
+                cmd_file.flush()
+                fname = cmd_file.name
                 script_location = tmp_dir + "/" + fname
-                self.log.info(
-                    "Temporary script location: %s",
-                    script_location
-                )
-                self.log.info("Running command: %s", bash_command)
-                sp = Popen(
-                    ['bash', fname],
-                    stdout=PIPE, stderr=STDOUT,
-                    cwd=tmp_dir, env=self.env,
-                    preexec_fn=os.setsid)
-
-                self.sp = sp
-
-                self.log.info("Output:")
-                line = ''
-                for line in iter(sp.stdout.readline, b''):
-                    line = line.decode(self.output_encoding).strip()
-                    self.log.info(line)
-                sp.wait()
-                self.log.info(
-                    "Command exited with return code %s",
-                    sp.returncode
-                )
-
-                if sp.returncode:
-                    raise AirflowException("Bash command failed")
-
-        if self.xcom_push_flag:
-            return line
+                logging.info("Temporary script location :{0}".format(script_location))
+                logging.info("Running command: " + self.bash_command)
+                self.sp = Popen(
+                        ['bash', fname],
+                        stdout=stdout_file,
+                        stderr=stderr_file,
+                        cwd=tmp_dir,
+                        env=self.env,
+                        preexec_fn=os.setsid)
+
+                self.sp.wait()
+
+                exit_msg = "Command exited with return code {0}".format(self.sp.returncode)
+                if self.sp.returncode:
+                    stderr_output = None
+                    with io.open(stderr_file.name, 'r+', encoding=self.output_encoding) as stderr_file_handle:
+                        if os.path.getsize(stderr_file.name) > 0:
+                            stderr_output = mmap.mmap(stderr_file_handle.fileno(), 0, access=mmap.ACCESS_READ)
+                    raise AirflowException("Bash command failed, {0}, error: {1}"
+                                           .format(exit_msg, stderr_output))
+
+                logging.info(exit_msg)
+                output = None
+                if self.output_regex_filter or self.log_outout or self.xcom_push:
+                    with io.open(stdout_file.name, 'r+', encoding=self.output_encoding) as stdout_file_handle:
+                        if os.path.getsize(stdout_file_handle.name) > 0:
+                            output = mmap.mmap(stdout_file_handle.fileno(), 0, access=mmap.ACCESS_READ)
+                            if self.output_regex_filter:
+                                pattern = self.output_regex_filter.encode("utf-8")
+                                try:
+                                    re.compile(pattern)
+                                except re.error:
+                                    raise AirflowException("command executed successfully, "
+                                                           "but Invalid regex supplied {0} "
+                                                           .format(self.output_regex_filter))
+
+                                filtered_output = re.search(pattern, output)
+                                if filtered_output:
+                                    return filtered_output.group().decode(self.output_encoding)
+                                else:
+                                    logging.warning("failed to match on output based on "
+                                                    "supplied regex : {0}"
+                                                    .format(self.output_regex_filter))
+
+                            if self.log_outout:
+                                logging.info("stdout: {0}".format(output))
+
+                            if self.xcom_push:
+                                return output
+                        else:
+                            logging.warning("stdout file: {0} is empty".format(stdout_file.name))
+                else:
+                    logging.warning("Not logging stdout for the command: {0}"
+                                    .format(self.bash_command))
 
     def on_kill(self):
         self.log.info('Sending SIGTERM signal to bash process group')
         os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
-
diff --git a/tests/core.py b/tests/core.py
index 0c94137d15..c61d9a1fc2 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -60,6 +60,7 @@
 from lxml import html
 from airflow.exceptions import AirflowException
 from airflow.configuration import AirflowConfigException, run_command
+from airflow.models import DAG, TaskInstance
 from jinja2.sandbox import SecurityError
 from jinja2 import UndefinedError
 
@@ -419,6 +420,25 @@ def test_bash_operator_multi_byte_output(self):
             output_encoding='utf-8')
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_bash_operator_multi_line_regexed_output(self):
+        t = BashOperator(
+                task_id='test_multi_line_bash_operator',
+                bash_command=r"echo 'this is first line. \n "
+                             r"this is 2nd line, \n "
+                             r"you can have n number of lines in output."
+                             r"This is last line.'",
+                xcom_push=True,
+                output_regex_filter="\s?\w+\s?\d\w+ line,",
+                dag=self.dag,
+                output_encoding='utf-8')
+
+        ti = TaskInstance(task=t, execution_date=datetime.now())
+        ti.run()
+        expected_regex_matched_output = ' is 2nd line,'
+        self.assertEqual(ti.xcom_pull(task_ids='test_multi_line_bash_operator',
+                                      key='return_value'),
+                         expected_regex_matched_output)
+
     def test_bash_operator_kill(self):
         import subprocess
         import psutil


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services