You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/27 17:38:40 UTC

[GitHub] kaxil closed pull request #4367: [AIRFLOW-3551] Improve BashOperator Test Coverage

kaxil closed pull request #4367: [AIRFLOW-3551] Improve BashOperator Test Coverage
URL: https://github.com/apache/incubator-airflow/pull/4367
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 13aa44fc85..a2217adf40 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -20,11 +20,10 @@
 
 import os
 import signal
+from builtins import bytes
 from subprocess import Popen, STDOUT, PIPE
 from tempfile import gettempdir, NamedTemporaryFile
 
-from builtins import bytes
-
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -66,13 +65,12 @@ class BashOperator(BaseOperator):
     ui_color = '#f0ede4'
 
     @apply_defaults
-    def __init__(
-            self,
-            bash_command,
-            xcom_push=False,
-            env=None,
-            output_encoding='utf-8',
-            *args, **kwargs):
+    def __init__(self,
+                 bash_command,
+                 xcom_push=False,
+                 env=None,
+                 output_encoding='utf-8',
+                 *args, **kwargs):
 
         super(BashOperator, self).__init__(*args, **kwargs)
         self.bash_command = bash_command
@@ -85,14 +83,14 @@ def execute(self, context):
         Execute the bash command in a temporary directory
         which will be cleaned afterwards
         """
-        self.log.info("Tmp dir root location: \n %s", gettempdir())
+        self.log.info('Tmp dir root location: \n %s', gettempdir())
 
         # Prepare env for child process.
         if self.env is None:
             self.env = os.environ.copy()
-        airflow_context_vars = context_to_airflow_vars(context,
-                                                       in_env_var_format=True)
-        self.log.info("Exporting the following env vars:\n" +
+
+        airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
+        self.log.info('Exporting the following env vars:\n' +
                       '\n'.join(["{}={}".format(k, v)
                                  for k, v in
                                  airflow_context_vars.items()]))
@@ -101,16 +99,11 @@ def execute(self, context):
         self.lineage_data = self.bash_command
 
         with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
-            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
-
-                f.write(bytes(self.bash_command, 'utf_8'))
-                f.flush()
-                fname = f.name
-                script_location = os.path.abspath(fname)
-                self.log.info(
-                    "Temporary script location: %s",
-                    script_location
-                )
+            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as tmp_file:
+                tmp_file.write(bytes(self.bash_command, 'utf_8'))
+                tmp_file.flush()
+                script_location = os.path.abspath(tmp_file.name)
+                self.log.info('Temporary script location: %s', script_location)
 
                 def pre_exec():
                     # Restore default signal disposition and invoke setsid
@@ -119,32 +112,33 @@ def pre_exec():
                             signal.signal(getattr(signal, sig), signal.SIG_DFL)
                     os.setsid()
 
-                self.log.info("Running command: %s", self.bash_command)
-                sp = Popen(
-                    ['bash', fname],
-                    stdout=PIPE, stderr=STDOUT,
-                    cwd=tmp_dir, env=self.env,
+                self.log.info('Running command: %s', self.bash_command)
+                sub_process = Popen(
+                    ['bash', tmp_file.name],
+                    stdout=PIPE,
+                    stderr=STDOUT,
+                    cwd=tmp_dir,
+                    env=self.env,
                     preexec_fn=pre_exec)
 
-                self.sp = sp
+                self.sub_process = sub_process
 
-                self.log.info("Output:")
+                self.log.info('Output:')
                 line = ''
-                for line in iter(sp.stdout.readline, b''):
-                    line = line.decode(self.output_encoding).rstrip()
+                for raw_line in iter(sub_process.stdout.readline, b''):
+                    line = raw_line.decode(self.output_encoding).rstrip()
                     self.log.info(line)
-                sp.wait()
-                self.log.info(
-                    "Command exited with return code %s",
-                    sp.returncode
-                )
 
-                if sp.returncode:
-                    raise AirflowException("Bash command failed")
+                sub_process.wait()
+
+                self.log.info('Command exited with return code %s', sub_process.returncode)
+
+                if sub_process.returncode:
+                    raise AirflowException('Bash command failed')
 
         if self.xcom_push_flag:
             return line
 
     def on_kill(self):
         self.log.info('Sending SIGTERM signal to bash process group')
-        os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
+        os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)
diff --git a/tests/operators/test_bash_operator.py b/tests/operators/test_bash_operator.py
index 8f55b9cda1..e298682734 100644
--- a/tests/operators/test_bash_operator.py
+++ b/tests/operators/test_bash_operator.py
@@ -15,6 +15,7 @@
 import os
 import unittest
 from datetime import datetime, timedelta
+from tempfile import NamedTemporaryFile
 
 from airflow import DAG
 from airflow.models import State
@@ -26,7 +27,8 @@
 INTERVAL = timedelta(hours=12)
 
 
-class BashOperatorTestCase(unittest.TestCase):
+class BashOperatorTest(unittest.TestCase):
+
     def test_echo_env_variables(self):
         """
         Test that env variables are exported correctly to the
@@ -52,10 +54,8 @@ def test_echo_env_variables(self):
             external_trigger=False,
         )
 
-        import tempfile
-        with tempfile.NamedTemporaryFile() as f:
-            fname = f.name
-            t = BashOperator(
+        with NamedTemporaryFile() as tmp_file:
+            task = BashOperator(
                 task_id='echo_env_vars',
                 dag=self.dag,
                 bash_command='echo $AIRFLOW_HOME>> {0};'
@@ -63,17 +63,17 @@ def test_echo_env_variables(self):
                              'echo $AIRFLOW_CTX_DAG_ID >> {0};'
                              'echo $AIRFLOW_CTX_TASK_ID>> {0};'
                              'echo $AIRFLOW_CTX_EXECUTION_DATE>> {0};'
-                             'echo $AIRFLOW_CTX_DAG_RUN_ID>> {0};'.format(fname)
+                             'echo $AIRFLOW_CTX_DAG_RUN_ID>> {0};'.format(tmp_file.name)
             )
 
             original_AIRFLOW_HOME = os.environ['AIRFLOW_HOME']
 
             os.environ['AIRFLOW_HOME'] = 'MY_PATH_TO_AIRFLOW_HOME'
-            t.run(DEFAULT_DATE, DEFAULT_DATE,
-                  ignore_first_depends_on_past=True, ignore_ti_state=True)
+            task.run(DEFAULT_DATE, DEFAULT_DATE,
+                     ignore_first_depends_on_past=True, ignore_ti_state=True)
 
-            with open(fname, 'r') as fr:
-                output = ''.join(fr.readlines())
+            with open(tmp_file.name, 'r') as file:
+                output = ''.join(file.readlines())
                 self.assertIn('MY_PATH_TO_AIRFLOW_HOME', output)
                 # exported in run_unit_tests.sh as part of PYTHONPATH
                 self.assertIn('tests/test_utils', output)
@@ -83,3 +83,14 @@ def test_echo_env_variables(self):
                 self.assertIn('manual__' + DEFAULT_DATE.isoformat(), output)
 
             os.environ['AIRFLOW_HOME'] = original_AIRFLOW_HOME
+
+    def test_return_value_to_xcom(self):
+        bash_operator = BashOperator(
+            bash_command='echo "stdout"',
+            xcom_push=True,
+            task_id='test_return_value_to_xcom',
+            dag=None
+        )
+        xcom_return_value = bash_operator.execute(context={})
+
+        self.assertEqual(xcom_return_value, u'stdout')


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services