You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:20:19 UTC
[GitHub] stale[bot] closed pull request #2026: [AIRFLOW-811] [BugFix]
bash_operator does not return full output
stale[bot] closed pull request #2026: [AIRFLOW-811] [BugFix] bash_operator does not return full output
URL: https://github.com/apache/incubator-airflow/pull/2026
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index ff2ed51b96..ebba0ee07e 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -13,11 +13,15 @@
# limitations under the License.
-from builtins import bytes
+import logging
+import mmap
import os
+import re
import signal
+import io
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile
+from builtins import bytes
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@@ -52,60 +56,90 @@ def __init__(
bash_command,
xcom_push=False,
env=None,
+ log_outout=True,
output_encoding='utf-8',
+ output_regex_filter=None,
*args, **kwargs):
super(BashOperator, self).__init__(*args, **kwargs)
self.bash_command = bash_command
self.env = env
- self.xcom_push_flag = xcom_push
+ self.xcom_push = xcom_push
+ self.log_outout = log_outout
self.output_encoding = output_encoding
+ self.output_regex_filter = output_regex_filter
+ self.sp = None
def execute(self, context):
"""
Execute the bash command in a temporary directory
which will be cleaned afterwards
"""
- bash_command = self.bash_command
- self.log.info("Tmp dir root location: \n %s", gettempdir())
with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
- with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
-
- f.write(bytes(bash_command, 'utf_8'))
- f.flush()
- fname = f.name
+ self.log.info("Tmp dir root location: {0}".format(tmp_dir))
+ with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as cmd_file, \
+ NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as stdout_file, \
+ NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as stderr_file:
+
+ cmd_file.write(bytes(self.bash_command, 'utf_8'))
+ cmd_file.flush()
+ fname = cmd_file.name
script_location = tmp_dir + "/" + fname
- self.log.info(
- "Temporary script location: %s",
- script_location
- )
- self.log.info("Running command: %s", bash_command)
- sp = Popen(
- ['bash', fname],
- stdout=PIPE, stderr=STDOUT,
- cwd=tmp_dir, env=self.env,
- preexec_fn=os.setsid)
-
- self.sp = sp
-
- self.log.info("Output:")
- line = ''
- for line in iter(sp.stdout.readline, b''):
- line = line.decode(self.output_encoding).strip()
- self.log.info(line)
- sp.wait()
- self.log.info(
- "Command exited with return code %s",
- sp.returncode
- )
-
- if sp.returncode:
- raise AirflowException("Bash command failed")
-
- if self.xcom_push_flag:
- return line
+ logging.info("Temporary script location :{0}".format(script_location))
+ logging.info("Running command: " + self.bash_command)
+ self.sp = Popen(
+ ['bash', fname],
+ stdout=stdout_file,
+ stderr=stderr_file,
+ cwd=tmp_dir,
+ env=self.env,
+ preexec_fn=os.setsid)
+
+ self.sp.wait()
+
+ exit_msg = "Command exited with return code {0}".format(self.sp.returncode)
+ if self.sp.returncode:
+ stderr_output = None
+ with io.open(stderr_file.name, 'r+', encoding=self.output_encoding) as stderr_file_handle:
+ if os.path.getsize(stderr_file.name) > 0:
+ stderr_output = mmap.mmap(stderr_file_handle.fileno(), 0, access=mmap.ACCESS_READ)
+ raise AirflowException("Bash command failed, {0}, error: {1}"
+ .format(exit_msg, stderr_output))
+
+ logging.info(exit_msg)
+ output = None
+ if self.output_regex_filter or self.log_outout or self.xcom_push:
+ with io.open(stdout_file.name, 'r+', encoding=self.output_encoding) as stdout_file_handle:
+ if os.path.getsize(stdout_file_handle.name) > 0:
+ output = mmap.mmap(stdout_file_handle.fileno(), 0, access=mmap.ACCESS_READ)
+ if self.output_regex_filter:
+ pattern = self.output_regex_filter.encode("utf-8")
+ try:
+ re.compile(pattern)
+ except re.error:
+ raise AirflowException("command executed successfully, "
+ "but Invalid regex supplied {0} "
+ .format(self.output_regex_filter))
+
+ filtered_output = re.search(pattern, output)
+ if filtered_output:
+ return filtered_output.group().decode(self.output_encoding)
+ else:
+ logging.warning("failed to match on output based on "
+ "supplied regex : {0}"
+ .format(self.output_regex_filter))
+
+ if self.log_outout:
+ logging.info("stdout: {0}".format(output))
+
+ if self.xcom_push:
+ return output
+ else:
+ logging.warning("stdout file: {0} is empty".format(stdout_file.name))
+ else:
+ logging.warning("Not logging stdout for the command: {0}"
+ .format(self.bash_command))
def on_kill(self):
self.log.info('Sending SIGTERM signal to bash process group')
os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
-
diff --git a/tests/core.py b/tests/core.py
index 0c94137d15..c61d9a1fc2 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -60,6 +60,7 @@
from lxml import html
from airflow.exceptions import AirflowException
from airflow.configuration import AirflowConfigException, run_command
+from airflow.models import DAG, TaskInstance
from jinja2.sandbox import SecurityError
from jinja2 import UndefinedError
@@ -419,6 +420,25 @@ def test_bash_operator_multi_byte_output(self):
output_encoding='utf-8')
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ def test_bash_operator_multi_line_regexed_output(self):
+ t = BashOperator(
+ task_id='test_multi_line_bash_operator',
+ bash_command=r"echo 'this is first line. \n "
+ r"this is 2nd line, \n "
+ r"you can have n number of lines in output."
+ r"This is last line.'",
+ xcom_push=True,
+ output_regex_filter="\s?\w+\s?\d\w+ line,",
+ dag=self.dag,
+ output_encoding='utf-8')
+
+ ti = TaskInstance(task=t, execution_date=datetime.now())
+ ti.run()
+ expected_regex_matched_output = ' is 2nd line,'
+ self.assertEqual(ti.xcom_pull(task_ids='test_multi_line_bash_operator',
+ key='return_value'),
+ expected_regex_matched_output)
+
def test_bash_operator_kill(self):
import subprocess
import psutil
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services