You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/16 09:54:41 UTC
[airflow] branch master updated: Add support for latest Apache Beam
SDK in Dataflow operators (#9323)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 639972d Add support for latest Apache Beam SDK in Dataflow operators (#9323)
639972d is described below
commit 639972d995d848b16a3f283576efdbde28b8fdef
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Tue Jun 16 11:53:37 2020 +0200
Add support for latest Apache Beam SDK in Dataflow operators (#9323)
* A
* Add support for Apache Beam latest SDK in Dataflow operators
* fixup! Add support for Apache Beam latest SDK in Dataflow operators
* fixup! fixup! Add support for Apache Beam latest SDK in Dataflow operators
* fixup! fixup! fixup! Add support for Apache Beam latest SDK in Dataflow operators
---
.../google/cloud/example_dags/example_dataflow.py | 21 +++++-
airflow/providers/google/cloud/hooks/dataflow.py | 85 +++++++++++-----------
.../providers/google/cloud/hooks/test_dataflow.py | 82 +++++++++++++++++++--
.../google/cloud/operators/test_dataflow_system.py | 12 ++-
4 files changed, 147 insertions(+), 53 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py b/airflow/providers/google/cloud/example_dags/example_dataflow.py
index b2aad96..d05318d 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataflow.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataflow.py
@@ -49,11 +49,11 @@ default_args = {
}
with models.DAG(
- "example_gcp_dataflow",
+ "example_gcp_dataflow_native_java",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=['example'],
-) as dag:
+) as dag_native_java:
# [START howto_operator_start_java_job]
start_java_job = DataflowCreateJavaJobOperator(
@@ -90,6 +90,13 @@ with models.DAG(
)
jar_to_local >> start_java_job_local
+with models.DAG(
+ "example_gcp_dataflow_native_python",
+ default_args=default_args,
+ schedule_interval=None, # Override to match your needs
+ tags=['example'],
+) as dag_native_python:
+
# [START howto_operator_start_python_job]
start_python_job = DataflowCreatePythonJobOperator(
task_id="start-python-job",
@@ -100,7 +107,7 @@ with models.DAG(
'output': GCS_OUTPUT,
},
py_requirements=[
- 'apache-beam[gcp]>=2.14.0'
+ 'apache-beam[gcp]==2.21.0'
],
py_interpreter='python3',
py_system_site_packages=False,
@@ -117,12 +124,18 @@ with models.DAG(
'output': GCS_OUTPUT,
},
py_requirements=[
- 'apache-beam[gcp]>=2.14.0'
+ 'apache-beam[gcp]==2.14.0'
],
py_interpreter='python3',
py_system_site_packages=False
)
+with models.DAG(
+ "example_gcp_dataflow_template",
+ default_args=default_args,
+ schedule_interval=None, # Override to match your needs
+ tags=['example'],
+) as dag_template:
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start-template-job",
template='gs://dataflow-templates/latest/Word_Count',
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py
index 291ec2f..bf5b683 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -22,6 +22,7 @@ import functools
import json
import re
import select
+import shlex
import subprocess
import time
import uuid
@@ -42,10 +43,9 @@ from airflow.utils.python_virtualenv import prepare_virtualenv
DEFAULT_DATAFLOW_LOCATION = 'us-central1'
-# https://github.com/apache/beam/blob/75eee7857bb80a0cdb4ce99ae3e184101092e2ed/sdks/go/pkg/beam/runners/
-# universal/runnerlib/execute.go#L85
JOB_ID_PATTERN = re.compile(
- r'https?://console\.cloud\.google\.com/dataflow/jobsDetail/locations/.+?/jobs/([a-z|0-9|A-Z|\-|\_]+).*?')
+ r'Submitted job: (?P<job_id_java>.*)|Created job with id: \[(?P<job_id_python>.*)\]'
+)
RT = TypeVar('RT') # pylint: disable=invalid-name
@@ -319,8 +319,9 @@ class _DataflowRunner(LoggingMixin):
on_new_job_id_callback: Optional[Callable[[str], None]] = None
) -> None:
super().__init__()
- self.log.info("Running command: %s", ' '.join(cmd))
+ self.log.info("Running command: %s", ' '.join(shlex.quote(c) for c in cmd))
self.on_new_job_id_callback = on_new_job_id_callback
+ self.job_id: Optional[str] = None
self._proc = subprocess.Popen(
cmd,
shell=False,
@@ -328,39 +329,45 @@ class _DataflowRunner(LoggingMixin):
stderr=subprocess.PIPE,
close_fds=True)
- def _read_line_by_fd(self, fd):
- if fd == self._proc.stderr.fileno():
- line = self._proc.stderr.readline().decode()
- if line:
- self.log.warning(line[:-1])
- return line
+ def _process_fd(self, fd):
+ """
+ Prints output to logs and lookup for job ID in each line.
+
+ :param fd: File descriptor.
+ """
+ if fd == self._proc.stderr:
+ while True:
+ line = self._proc.stderr.readline().decode()
+ if not line:
+ return
+ self._process_line_and_extract_job_id(line)
+ self.log.warning(line.rstrip("\n"))
- if fd == self._proc.stdout.fileno():
- line = self._proc.stdout.readline().decode()
- if line:
- self.log.info(line[:-1])
- return line
+ if fd == self._proc.stdout:
+ while True:
+ line = self._proc.stdout.readline().decode()
+ if not line:
+ return
+ self._process_line_and_extract_job_id(line)
+ self.log.info(line.rstrip("\n"))
raise Exception("No data in stderr or in stdout.")
- def _extract_job(self, line: str) -> Optional[str]:
+ def _process_line_and_extract_job_id(self, line: str) -> None:
"""
Extracts job_id.
:param line: URL from which job_id has to be extracted
:type line: str
- :return: job_id or None if no match
- :rtype: Optional[str]
"""
# Job id info: https://goo.gl/SE29y9.
matched_job = JOB_ID_PATTERN.search(line)
if matched_job:
- job_id = matched_job.group(1)
+ job_id = matched_job.group('job_id_java') or matched_job.group('job_id_python')
self.log.info("Found Job ID: %s", job_id)
+ self.job_id = job_id
if self.on_new_job_id_callback:
self.on_new_job_id_callback(job_id)
- return job_id
- return None
def wait_for_done(self) -> Optional[str]:
"""
@@ -369,35 +376,31 @@ class _DataflowRunner(LoggingMixin):
:return: Job id
:rtype: Optional[str]
"""
- reads = [self._proc.stderr.fileno() if self._proc.stderr else 0,
- self._proc.stdout.fileno() if self._proc.stdout else 0]
self.log.info("Start waiting for DataFlow process to complete.")
- job_id = None
- # Make sure logs are processed regardless whether the subprocess is
- # terminated.
- process_ends = False
+ self.job_id = None
+ reads = [self._proc.stderr, self._proc.stdout]
while True:
# Wait for at least one available fd.
- readable_fbs, _, _ = select.select(reads, [], [], 5)
- if readable_fbs is None:
+ readable_fds, _, _ = select.select(reads, [], [], 5)
+ if readable_fds is None:
self.log.info("Waiting for DataFlow process to complete.")
continue
- # Read available fds.
- for readable_fb in readable_fbs:
- line = self._read_line_by_fd(readable_fb)
- if line and not job_id:
- job_id = job_id or self._extract_job(line)
+ for readable_fd in readable_fds:
+ self._process_fd(readable_fd)
- if process_ends:
- break
if self._proc.poll() is not None:
- # Mark process completion but allows its outputs to be consumed.
- process_ends = True
+ break
+
+ # Corner case: check if more output was created between the last read and the process termination
+ for readable_fd in reads:
+ self._process_fd(readable_fd)
+
+ self.log.info("Process exited with return code: %s", self._proc.returncode)
+
if self._proc.returncode != 0:
- raise Exception("DataFlow failed with return code {}".format(
- self._proc.returncode))
- return job_id
+ raise Exception("DataFlow failed with return code {}".format(self._proc.returncode))
+ return self.job_id
class DataflowHook(GoogleBaseHook):
diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py b/tests/providers/google/cloud/hooks/test_dataflow.py
index f3c2b79..d3568f3 100644
--- a/tests/providers/google/cloud/hooks/test_dataflow.py
+++ b/tests/providers/google/cloud/hooks/test_dataflow.py
@@ -18,6 +18,7 @@
#
import copy
+import shlex
import unittest
from typing import Any, Dict
@@ -1026,14 +1027,83 @@ class TestDataflowJob(unittest.TestCase):
mock_batch.execute.assert_called_once()
+APACHE_BEAM_V_2_14_0_JAVA_SDK_LOG = f""""\
+Dataflow SDK version: 2.14.0
+Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow\
+/jobsDetail/locations/europe-west3/jobs/{TEST_JOB_ID}?project=XXX
+Submitted job: {TEST_JOB_ID}
+Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: To cancel the job using the 'gcloud' tool, run:
+> gcloud dataflow jobs --project=XXX cancel --region=europe-west3 {TEST_JOB_ID}
+"""
+
+APACHE_BEAM_V_2_22_0_JAVA_SDK_LOG = f""""\
+INFO: Dataflow SDK version: 2.22.0
+Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow\
+/jobs/europe-west3/{TEST_JOB_ID}?project=XXXX
+Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: Submitted job: {TEST_JOB_ID}
+Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: To cancel the job using the 'gcloud' tool, run:
+> gcloud dataflow jobs --project=XXX cancel --region=europe-west3 {TEST_JOB_ID}
+"""
+
+APACHE_BEAM_V_2_14_0_PYTHON_SDK_LOG = f""""\
+INFO:root:Completed GCS upload to gs://test-dataflow-example/staging/start-python-job-local-5bcf3d71.\
+1592286375.000962/apache_beam-2.14.0-cp37-cp37m-manylinux1_x86_64.whl in 0 seconds.
+INFO:root:Create job: <Job
+ createTime: '2020-06-16T05:46:20.911857Z'
+ currentStateTime: '1970-01-01T00:00:00Z'
+ id: '{TEST_JOB_ID}'
+ location: 'us-central1'
+ name: 'start-python-job-local-5bcf3d71'
+ projectId: 'XXX'
+ stageStates: []
+ startTime: '2020-06-16T05:46:20.911857Z'
+ steps: []
+ tempFiles: []
+ type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
+INFO:root:Created job with id: [{TEST_JOB_ID}]
+INFO:root:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/\
+dataflow/jobsDetail/locations/us-central1/jobs/{TEST_JOB_ID}?project=XXX
+"""
+
+APACHE_BEAM_V_2_22_0_PYTHON_SDK_LOG = f""""\
+INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://test-dataflow-example/\
+staging/start-python-job-local-5bcf3d71.1592286719.303624/apache_beam-2.22.0-cp37-cp37m-manylinux1_x86_64.whl\
+ in 1 seconds.
+INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
+ createTime: '2020-06-16T05:52:04.095216Z'
+ currentStateTime: '1970-01-01T00:00:00Z'
+ id: '{TEST_JOB_ID}'
+ location: 'us-central1'
+ name: 'start-python-job-local-5bcf3d71'
+ projectId: 'XXX'
+ stageStates: []
+ startTime: '2020-06-16T05:52:04.095216Z'
+ steps: []
+ tempFiles: []
+ type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
+INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [{TEST_JOB_ID}]
+INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: {TEST_JOB_ID}
+INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please \
+navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/{TEST_JOB_ID}?project=XXX
+"""
+
+
class TestDataflow(unittest.TestCase):
- def test_data_flow_valid_job_id(self):
- cmd = [
- 'echo', 'additional unit test lines.\n' +
- 'https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/'
- 'jobs/{}?project=XXX'.format(TEST_JOB_ID)
- ]
+ @parameterized.expand([
+ (APACHE_BEAM_V_2_14_0_JAVA_SDK_LOG, ),
+ (APACHE_BEAM_V_2_22_0_JAVA_SDK_LOG, ),
+ (APACHE_BEAM_V_2_14_0_PYTHON_SDK_LOG, ),
+ (APACHE_BEAM_V_2_22_0_PYTHON_SDK_LOG, ),
+ ], name_func=lambda func, num, p: f"{func.__name__}_{num}")
+ def test_data_flow_valid_job_id(self, log):
+ echos = ";".join([f"echo {shlex.quote(line)}" for line in log.split("\n")])
+ cmd = ["bash", "-c", echos]
self.assertEqual(_DataflowRunner(cmd).wait_for_done(), TEST_JOB_ID)
def test_data_flow_missing_job_id(self):
diff --git a/tests/providers/google/cloud/operators/test_dataflow_system.py b/tests/providers/google/cloud/operators/test_dataflow_system.py
index 3a42cb1..9308d55 100644
--- a/tests/providers/google/cloud/operators/test_dataflow_system.py
+++ b/tests/providers/google/cloud/operators/test_dataflow_system.py
@@ -25,5 +25,13 @@ from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTe
@pytest.mark.credential_file(GCP_DATAFLOW_KEY)
class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
@provide_gcp_context(GCP_DATAFLOW_KEY)
- def test_run_example_dag_function(self):
- self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+ def test_run_example_gcp_dataflow_native_java(self):
+ self.run_dag('example_gcp_dataflow_native_java', CLOUD_DAG_FOLDER)
+
+ @provide_gcp_context(GCP_DATAFLOW_KEY)
+ def test_run_example_gcp_dataflow_native_python(self):
+ self.run_dag('example_gcp_dataflow_native_python', CLOUD_DAG_FOLDER)
+
+ @provide_gcp_context(GCP_DATAFLOW_KEY)
+ def test_run_example_gcp_dataflow_template(self):
+ self.run_dag('example_gcp_dataflow_template', CLOUD_DAG_FOLDER)