You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/21 00:29:49 UTC
[airflow] branch master updated: Dataflow operators don't not
always create a virtualenv (#10373)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 88c7d2e Dataflow operators don't not always create a virtualenv (#10373)
88c7d2e is described below
commit 88c7d2e526af4994066f65f830e2fa8edcbbce2e
Author: Craig Chatfield <30...@users.noreply.github.com>
AuthorDate: Fri Aug 21 01:28:37 2020 +0100
Dataflow operators don't not always create a virtualenv (#10373)
---
airflow/providers/google/cloud/hooks/dataflow.py | 16 +++++-
.../providers/google/cloud/operators/dataflow.py | 4 +-
.../providers/google/cloud/hooks/test_dataflow.py | 62 ++++++++++++++++++++++
.../google/cloud/operators/test_dataflow.py | 2 +-
4 files changed, 80 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py
index 7b21c2c..44dc877 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -24,6 +24,7 @@ import re
import select
import shlex
import subprocess
+import textwrap
import time
import uuid
import warnings
@@ -633,7 +634,7 @@ class DataflowHook(GoogleBaseHook):
:param py_system_site_packages: Whether to include system_site_packages in your virtualenv.
See virtualenv documentation for more information.
- This option is only relevant if the ``py_requirements`` parameter is passed.
+ This option is only relevant if the ``py_requirements`` parameter is not None.
:type py_interpreter: str
:param append_job_name: True if unique suffix has to be appended to job name.
:type append_job_name: bool
@@ -653,6 +654,19 @@ class DataflowHook(GoogleBaseHook):
for key, value in labels_dict.items()]
if py_requirements is not None:
+ if not py_requirements and not py_system_site_packages:
+ warning_invalid_environment = textwrap.dedent(
+ """\
+ Invalid method invocation. You have disabled inclusion of system packages and empty list
+ required for installation, so it is not possible to create a valid virtual environment.
+ In the virtual environment, apache-beam package must be installed for your job to be \
+ executed. To fix this problem:
+ * install apache-beam on the system, then set parameter py_system_site_packages to True,
+ * add apache-beam to the list of required packages in parameter py_requirements.
+ """
+ )
+ raise AirflowException(warning_invalid_environment)
+
with TemporaryDirectory(prefix='dataflow-venv') as tmp_dir:
py_interpreter = prepare_virtualenv(
venv_directory=tmp_dir,
diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py
index 85471e3..33adbac 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -470,7 +470,7 @@ class DataflowCreatePythonJobOperator(BaseOperator):
:param py_system_site_packages: Whether to include system_site_packages in your virtualenv.
See virtualenv documentation for more information.
- This option is only relevant if the ``py_requirements`` parameter is passed.
+ This option is only relevant if the ``py_requirements`` parameter is not None.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:type gcp_conn_id: str
:param project_id: Optional, the GCP project ID in which to start a job.
@@ -517,7 +517,7 @@ class DataflowCreatePythonJobOperator(BaseOperator):
self.options.setdefault('labels', {}).update(
{'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')})
self.py_interpreter = py_interpreter
- self.py_requirements = py_requirements or []
+ self.py_requirements = py_requirements
self.py_system_site_packages = py_system_site_packages
self.project_id = project_id
self.location = location
diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py b/tests/providers/google/cloud/hooks/test_dataflow.py
index 0b2ad7d..50b323a 100644
--- a/tests/providers/google/cloud/hooks/test_dataflow.py
+++ b/tests/providers/google/cloud/hooks/test_dataflow.py
@@ -314,6 +314,68 @@ class TestDataflowHook(unittest.TestCase):
self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]),
sorted(expected_cmd))
+ @parameterized.expand([
+ (['foo-bar'], False),
+ (['foo-bar'], True),
+ ([], True),
+ ])
+ @mock.patch(DATAFLOW_STRING.format('prepare_virtualenv'))
+ @mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
+ @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
+ @mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
+ @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
+ def test_start_python_dataflow_with_non_empty_py_requirements_and_without_system_packages(
+ self,
+ current_py_requirements,
+ current_py_system_site_packages,
+ mock_conn,
+ mock_dataflow,
+ mock_dataflowjob,
+ mock_uuid,
+ mock_virtualenv,
+ ):
+ mock_uuid.return_value = MOCK_UUID
+ mock_conn.return_value = None
+ dataflow_instance = mock_dataflow.return_value
+ dataflow_instance.wait_for_done.return_value = None
+ dataflowjob_instance = mock_dataflowjob.return_value
+ dataflowjob_instance.wait_for_done.return_value = None
+ mock_virtualenv.return_value = '/dummy_dir/bin/python'
+ self.dataflow_hook.start_python_dataflow( # pylint: disable=no-value-for-parameter
+ job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY,
+ dataflow=PY_FILE, py_options=PY_OPTIONS,
+ py_requirements=current_py_requirements,
+ py_system_site_packages=current_py_system_site_packages
+ )
+ expected_cmd = ['/dummy_dir/bin/python', '-m', PY_FILE,
+ '--region=us-central1',
+ '--runner=DataflowRunner', '--project=test',
+ '--labels=foo=bar',
+ '--staging_location=gs://test/staging',
+ '--job_name={}-{}'.format(JOB_NAME, MOCK_UUID)]
+ self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]),
+ sorted(expected_cmd))
+
+ @mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
+ @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
+ @mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
+ @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
+ def test_start_python_dataflow_with_empty_py_requirements_and_without_system_packages(
+ self, mock_conn, mock_dataflow, mock_dataflowjob, mock_uuid
+ ):
+ mock_uuid.return_value = MOCK_UUID
+ mock_conn.return_value = None
+ dataflow_instance = mock_dataflow.return_value
+ dataflow_instance.wait_for_done.return_value = None
+ dataflowjob_instance = mock_dataflowjob.return_value
+ dataflowjob_instance.wait_for_done.return_value = None
+ with self.assertRaisesRegex(AirflowException, "Invalid method invocation."):
+ self.dataflow_hook.start_python_dataflow( # pylint: disable=no-value-for-parameter
+ job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY,
+ dataflow=PY_FILE, py_options=PY_OPTIONS,
+ py_requirements=[]
+ )
+
@mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
@mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
@mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
diff --git a/tests/providers/google/cloud/operators/test_dataflow.py b/tests/providers/google/cloud/operators/test_dataflow.py
index 71d589f..402fd04 100644
--- a/tests/providers/google/cloud/operators/test_dataflow.py
+++ b/tests/providers/google/cloud/operators/test_dataflow.py
@@ -114,7 +114,7 @@ class TestDataflowPythonOperator(unittest.TestCase):
dataflow=mock.ANY,
py_options=PY_OPTIONS,
py_interpreter=PY_INTERPRETER,
- py_requirements=[],
+ py_requirements=None,
py_system_site_packages=False,
on_new_job_id_callback=mock.ANY,
project_id=None,