You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/21 00:29:49 UTC

[airflow] branch master updated: Dataflow operators don't not always create a virtualenv (#10373)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 88c7d2e  Dataflow operators don't not always create a virtualenv (#10373)
88c7d2e is described below

commit 88c7d2e526af4994066f65f830e2fa8edcbbce2e
Author: Craig Chatfield <30...@users.noreply.github.com>
AuthorDate: Fri Aug 21 01:28:37 2020 +0100

    Dataflow operators don't not always create a virtualenv (#10373)
---
 airflow/providers/google/cloud/hooks/dataflow.py   | 16 +++++-
 .../providers/google/cloud/operators/dataflow.py   |  4 +-
 .../providers/google/cloud/hooks/test_dataflow.py  | 62 ++++++++++++++++++++++
 .../google/cloud/operators/test_dataflow.py        |  2 +-
 4 files changed, 80 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py
index 7b21c2c..44dc877 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -24,6 +24,7 @@ import re
 import select
 import shlex
 import subprocess
+import textwrap
 import time
 import uuid
 import warnings
@@ -633,7 +634,7 @@ class DataflowHook(GoogleBaseHook):
         :param py_system_site_packages: Whether to include system_site_packages in your virtualenv.
             See virtualenv documentation for more information.
 
-            This option is only relevant if the ``py_requirements`` parameter is passed.
+            This option is only relevant if the ``py_requirements`` parameter is not None.
         :type py_interpreter: str
         :param append_job_name: True if unique suffix has to be appended to job name.
         :type append_job_name: bool
@@ -653,6 +654,19 @@ class DataflowHook(GoogleBaseHook):
                     for key, value in labels_dict.items()]
 
         if py_requirements is not None:
+            if not py_requirements and not py_system_site_packages:
+                warning_invalid_environment = textwrap.dedent(
+                    """\
+                    Invalid method invocation. You have disabled inclusion of system packages and empty list
+                    required for installation, so it is not possible to create a valid virtual environment.
+                    In the virtual environment, apache-beam package must be installed for your job to be \
+                    executed. To fix this problem:
+                    * install apache-beam on the system, then set parameter py_system_site_packages to True,
+                    * add apache-beam to the list of required packages in parameter py_requirements.
+                    """
+                )
+                raise AirflowException(warning_invalid_environment)
+
             with TemporaryDirectory(prefix='dataflow-venv') as tmp_dir:
                 py_interpreter = prepare_virtualenv(
                     venv_directory=tmp_dir,
diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py
index 85471e3..33adbac 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -470,7 +470,7 @@ class DataflowCreatePythonJobOperator(BaseOperator):
     :param py_system_site_packages: Whether to include system_site_packages in your virtualenv.
         See virtualenv documentation for more information.
 
-        This option is only relevant if the ``py_requirements`` parameter is passed.
+        This option is only relevant if the ``py_requirements`` parameter is not None.
     :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
     :type gcp_conn_id: str
     :param project_id: Optional, the GCP project ID in which to start a job.
@@ -517,7 +517,7 @@ class DataflowCreatePythonJobOperator(BaseOperator):
         self.options.setdefault('labels', {}).update(
             {'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')})
         self.py_interpreter = py_interpreter
-        self.py_requirements = py_requirements or []
+        self.py_requirements = py_requirements
         self.py_system_site_packages = py_system_site_packages
         self.project_id = project_id
         self.location = location
diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py b/tests/providers/google/cloud/hooks/test_dataflow.py
index 0b2ad7d..50b323a 100644
--- a/tests/providers/google/cloud/hooks/test_dataflow.py
+++ b/tests/providers/google/cloud/hooks/test_dataflow.py
@@ -314,6 +314,68 @@ class TestDataflowHook(unittest.TestCase):
         self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]),
                              sorted(expected_cmd))
 
+    @parameterized.expand([
+        (['foo-bar'], False),
+        (['foo-bar'], True),
+        ([], True),
+    ])
+    @mock.patch(DATAFLOW_STRING.format('prepare_virtualenv'))
+    @mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
+    @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
+    @mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
+    @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
+    def test_start_python_dataflow_with_non_empty_py_requirements_and_without_system_packages(
+        self,
+        current_py_requirements,
+        current_py_system_site_packages,
+        mock_conn,
+        mock_dataflow,
+        mock_dataflowjob,
+        mock_uuid,
+        mock_virtualenv,
+    ):
+        mock_uuid.return_value = MOCK_UUID
+        mock_conn.return_value = None
+        dataflow_instance = mock_dataflow.return_value
+        dataflow_instance.wait_for_done.return_value = None
+        dataflowjob_instance = mock_dataflowjob.return_value
+        dataflowjob_instance.wait_for_done.return_value = None
+        mock_virtualenv.return_value = '/dummy_dir/bin/python'
+        self.dataflow_hook.start_python_dataflow(  # pylint: disable=no-value-for-parameter
+            job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY,
+            dataflow=PY_FILE, py_options=PY_OPTIONS,
+            py_requirements=current_py_requirements,
+            py_system_site_packages=current_py_system_site_packages
+        )
+        expected_cmd = ['/dummy_dir/bin/python', '-m', PY_FILE,
+                        '--region=us-central1',
+                        '--runner=DataflowRunner', '--project=test',
+                        '--labels=foo=bar',
+                        '--staging_location=gs://test/staging',
+                        '--job_name={}-{}'.format(JOB_NAME, MOCK_UUID)]
+        self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]),
+                             sorted(expected_cmd))
+
+    @mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
+    @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
+    @mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
+    @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
+    def test_start_python_dataflow_with_empty_py_requirements_and_without_system_packages(
+        self, mock_conn, mock_dataflow, mock_dataflowjob, mock_uuid
+    ):
+        mock_uuid.return_value = MOCK_UUID
+        mock_conn.return_value = None
+        dataflow_instance = mock_dataflow.return_value
+        dataflow_instance.wait_for_done.return_value = None
+        dataflowjob_instance = mock_dataflowjob.return_value
+        dataflowjob_instance.wait_for_done.return_value = None
+        with self.assertRaisesRegex(AirflowException, "Invalid method invocation."):
+            self.dataflow_hook.start_python_dataflow(  # pylint: disable=no-value-for-parameter
+                job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY,
+                dataflow=PY_FILE, py_options=PY_OPTIONS,
+                py_requirements=[]
+            )
+
     @mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
     @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
     @mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
diff --git a/tests/providers/google/cloud/operators/test_dataflow.py b/tests/providers/google/cloud/operators/test_dataflow.py
index 71d589f..402fd04 100644
--- a/tests/providers/google/cloud/operators/test_dataflow.py
+++ b/tests/providers/google/cloud/operators/test_dataflow.py
@@ -114,7 +114,7 @@ class TestDataflowPythonOperator(unittest.TestCase):
             dataflow=mock.ANY,
             py_options=PY_OPTIONS,
             py_interpreter=PY_INTERPRETER,
-            py_requirements=[],
+            py_requirements=None,
             py_system_site_packages=False,
             on_new_job_id_callback=mock.ANY,
             project_id=None,