You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/16 09:54:41 UTC

[airflow] branch master updated: Add support for latest Apache Beam SDK in Dataflow operators (#9323)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 639972d  Add support for latest Apache Beam SDK in Dataflow operators (#9323)
639972d is described below

commit 639972d995d848b16a3f283576efdbde28b8fdef
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Tue Jun 16 11:53:37 2020 +0200

    Add support for latest Apache Beam SDK in Dataflow operators (#9323)
    
    * A
    
    * Add support for Apache Beam latest SDK in Dataflow operators
    
    * fixup! Add support for Apache Beam latest SDK in Dataflow operators
    
    * fixup! fixup! Add support for Apache Beam latest SDK in Dataflow operators
    
    * fixup! fixup! fixup! Add support for Apache Beam latest SDK in Dataflow operators
---
 .../google/cloud/example_dags/example_dataflow.py  | 21 +++++-
 airflow/providers/google/cloud/hooks/dataflow.py   | 85 +++++++++++-----------
 .../providers/google/cloud/hooks/test_dataflow.py  | 82 +++++++++++++++++++--
 .../google/cloud/operators/test_dataflow_system.py | 12 ++-
 4 files changed, 147 insertions(+), 53 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py b/airflow/providers/google/cloud/example_dags/example_dataflow.py
index b2aad96..d05318d 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataflow.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataflow.py
@@ -49,11 +49,11 @@ default_args = {
 }
 
 with models.DAG(
-    "example_gcp_dataflow",
+    "example_gcp_dataflow_native_java",
     default_args=default_args,
     schedule_interval=None,  # Override to match your needs
     tags=['example'],
-) as dag:
+) as dag_native_java:
 
     # [START howto_operator_start_java_job]
     start_java_job = DataflowCreateJavaJobOperator(
@@ -90,6 +90,13 @@ with models.DAG(
     )
     jar_to_local >> start_java_job_local
 
+with models.DAG(
+    "example_gcp_dataflow_native_python",
+    default_args=default_args,
+    schedule_interval=None,  # Override to match your needs
+    tags=['example'],
+) as dag_native_python:
+
     # [START howto_operator_start_python_job]
     start_python_job = DataflowCreatePythonJobOperator(
         task_id="start-python-job",
@@ -100,7 +107,7 @@ with models.DAG(
             'output': GCS_OUTPUT,
         },
         py_requirements=[
-            'apache-beam[gcp]>=2.14.0'
+            'apache-beam[gcp]==2.21.0'
         ],
         py_interpreter='python3',
         py_system_site_packages=False,
@@ -117,12 +124,18 @@ with models.DAG(
             'output': GCS_OUTPUT,
         },
         py_requirements=[
-            'apache-beam[gcp]>=2.14.0'
+            'apache-beam[gcp]==2.14.0'
         ],
         py_interpreter='python3',
         py_system_site_packages=False
     )
 
+with models.DAG(
+    "example_gcp_dataflow_template",
+    default_args=default_args,
+    schedule_interval=None,  # Override to match your needs
+    tags=['example'],
+) as dag_template:
     start_template_job = DataflowTemplatedJobStartOperator(
         task_id="start-template-job",
         template='gs://dataflow-templates/latest/Word_Count',
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py
index 291ec2f..bf5b683 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -22,6 +22,7 @@ import functools
 import json
 import re
 import select
+import shlex
 import subprocess
 import time
 import uuid
@@ -42,10 +43,9 @@ from airflow.utils.python_virtualenv import prepare_virtualenv
 DEFAULT_DATAFLOW_LOCATION = 'us-central1'
 
 
-# https://github.com/apache/beam/blob/75eee7857bb80a0cdb4ce99ae3e184101092e2ed/sdks/go/pkg/beam/runners/
-# universal/runnerlib/execute.go#L85
 JOB_ID_PATTERN = re.compile(
-    r'https?://console\.cloud\.google\.com/dataflow/jobsDetail/locations/.+?/jobs/([a-z|0-9|A-Z|\-|\_]+).*?')
+    r'Submitted job: (?P<job_id_java>.*)|Created job with id: \[(?P<job_id_python>.*)\]'
+)
 
 RT = TypeVar('RT')  # pylint: disable=invalid-name
 
@@ -319,8 +319,9 @@ class _DataflowRunner(LoggingMixin):
         on_new_job_id_callback: Optional[Callable[[str], None]] = None
     ) -> None:
         super().__init__()
-        self.log.info("Running command: %s", ' '.join(cmd))
+        self.log.info("Running command: %s", ' '.join(shlex.quote(c) for c in cmd))
         self.on_new_job_id_callback = on_new_job_id_callback
+        self.job_id: Optional[str] = None
         self._proc = subprocess.Popen(
             cmd,
             shell=False,
@@ -328,39 +329,45 @@ class _DataflowRunner(LoggingMixin):
             stderr=subprocess.PIPE,
             close_fds=True)
 
-    def _read_line_by_fd(self, fd):
-        if fd == self._proc.stderr.fileno():
-            line = self._proc.stderr.readline().decode()
-            if line:
-                self.log.warning(line[:-1])
-            return line
+    def _process_fd(self, fd):
+        """
+        Prints output to logs and lookup for job ID in each line.
+
+        :param fd: File descriptor.
+        """
+        if fd == self._proc.stderr:
+            while True:
+                line = self._proc.stderr.readline().decode()
+                if not line:
+                    return
+                self._process_line_and_extract_job_id(line)
+                self.log.warning(line.rstrip("\n"))
 
-        if fd == self._proc.stdout.fileno():
-            line = self._proc.stdout.readline().decode()
-            if line:
-                self.log.info(line[:-1])
-            return line
+        if fd == self._proc.stdout:
+            while True:
+                line = self._proc.stdout.readline().decode()
+                if not line:
+                    return
+                self._process_line_and_extract_job_id(line)
+                self.log.info(line.rstrip("\n"))
 
         raise Exception("No data in stderr or in stdout.")
 
-    def _extract_job(self, line: str) -> Optional[str]:
+    def _process_line_and_extract_job_id(self, line: str) -> None:
         """
         Extracts job_id.
 
         :param line: URL from which job_id has to be extracted
         :type line: str
-        :return: job_id or None if no match
-        :rtype: Optional[str]
         """
         # Job id info: https://goo.gl/SE29y9.
         matched_job = JOB_ID_PATTERN.search(line)
         if matched_job:
-            job_id = matched_job.group(1)
+            job_id = matched_job.group('job_id_java') or matched_job.group('job_id_python')
             self.log.info("Found Job ID: %s", job_id)
+            self.job_id = job_id
             if self.on_new_job_id_callback:
                 self.on_new_job_id_callback(job_id)
-            return job_id
-        return None
 
     def wait_for_done(self) -> Optional[str]:
         """
@@ -369,35 +376,31 @@ class _DataflowRunner(LoggingMixin):
         :return: Job id
         :rtype: Optional[str]
         """
-        reads = [self._proc.stderr.fileno() if self._proc.stderr else 0,
-                 self._proc.stdout.fileno() if self._proc.stdout else 0]
         self.log.info("Start waiting for DataFlow process to complete.")
-        job_id = None
-        # Make sure logs are processed regardless whether the subprocess is
-        # terminated.
-        process_ends = False
+        self.job_id = None
+        reads = [self._proc.stderr, self._proc.stdout]
         while True:
             # Wait for at least one available fd.
-            readable_fbs, _, _ = select.select(reads, [], [], 5)
-            if readable_fbs is None:
+            readable_fds, _, _ = select.select(reads, [], [], 5)
+            if readable_fds is None:
                 self.log.info("Waiting for DataFlow process to complete.")
                 continue
 
-            # Read available fds.
-            for readable_fb in readable_fbs:
-                line = self._read_line_by_fd(readable_fb)
-                if line and not job_id:
-                    job_id = job_id or self._extract_job(line)
+            for readable_fd in readable_fds:
+                self._process_fd(readable_fd)
 
-            if process_ends:
-                break
             if self._proc.poll() is not None:
-                # Mark process completion but allows its outputs to be consumed.
-                process_ends = True
+                break
+
+        # Corner case: check if more output was created between the last read and the process termination
+        for readable_fd in reads:
+            self._process_fd(readable_fd)
+
+        self.log.info("Process exited with return code: %s", self._proc.returncode)
+
         if self._proc.returncode != 0:
-            raise Exception("DataFlow failed with return code {}".format(
-                self._proc.returncode))
-        return job_id
+            raise Exception("DataFlow failed with return code {}".format(self._proc.returncode))
+        return self.job_id
 
 
 class DataflowHook(GoogleBaseHook):
diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py b/tests/providers/google/cloud/hooks/test_dataflow.py
index f3c2b79..d3568f3 100644
--- a/tests/providers/google/cloud/hooks/test_dataflow.py
+++ b/tests/providers/google/cloud/hooks/test_dataflow.py
@@ -18,6 +18,7 @@
 #
 
 import copy
+import shlex
 import unittest
 from typing import Any, Dict
 
@@ -1026,14 +1027,83 @@ class TestDataflowJob(unittest.TestCase):
         mock_batch.execute.assert_called_once()
 
 
+APACHE_BEAM_V_2_14_0_JAVA_SDK_LOG = f""""\
+Dataflow SDK version: 2.14.0
+Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow\
+/jobsDetail/locations/europe-west3/jobs/{TEST_JOB_ID}?project=XXX
+Submitted job: {TEST_JOB_ID}
+Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: To cancel the job using the 'gcloud' tool, run:
+> gcloud dataflow jobs --project=XXX cancel --region=europe-west3 {TEST_JOB_ID}
+"""
+
+APACHE_BEAM_V_2_22_0_JAVA_SDK_LOG = f""""\
+INFO: Dataflow SDK version: 2.22.0
+Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow\
+/jobs/europe-west3/{TEST_JOB_ID}?project=XXXX
+Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: Submitted job: {TEST_JOB_ID}
+Jun 15, 2020 3:09:03 PM org.apache.beam.runners.dataflow.DataflowRunner run
+INFO: To cancel the job using the 'gcloud' tool, run:
+> gcloud dataflow jobs --project=XXX cancel --region=europe-west3 {TEST_JOB_ID}
+"""
+
+APACHE_BEAM_V_2_14_0_PYTHON_SDK_LOG = f""""\
+INFO:root:Completed GCS upload to gs://test-dataflow-example/staging/start-python-job-local-5bcf3d71.\
+1592286375.000962/apache_beam-2.14.0-cp37-cp37m-manylinux1_x86_64.whl in 0 seconds.
+INFO:root:Create job: <Job
+ createTime: '2020-06-16T05:46:20.911857Z'
+ currentStateTime: '1970-01-01T00:00:00Z'
+ id: '{TEST_JOB_ID}'
+ location: 'us-central1'
+ name: 'start-python-job-local-5bcf3d71'
+ projectId: 'XXX'
+ stageStates: []
+ startTime: '2020-06-16T05:46:20.911857Z'
+ steps: []
+ tempFiles: []
+ type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
+INFO:root:Created job with id: [{TEST_JOB_ID}]
+INFO:root:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/\
+dataflow/jobsDetail/locations/us-central1/jobs/{TEST_JOB_ID}?project=XXX
+"""
+
+APACHE_BEAM_V_2_22_0_PYTHON_SDK_LOG = f""""\
+INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://test-dataflow-example/\
+staging/start-python-job-local-5bcf3d71.1592286719.303624/apache_beam-2.22.0-cp37-cp37m-manylinux1_x86_64.whl\
+ in 1 seconds.
+INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
+ createTime: '2020-06-16T05:52:04.095216Z'
+ currentStateTime: '1970-01-01T00:00:00Z'
+ id: '{TEST_JOB_ID}'
+ location: 'us-central1'
+ name: 'start-python-job-local-5bcf3d71'
+ projectId: 'XXX'
+ stageStates: []
+ startTime: '2020-06-16T05:52:04.095216Z'
+ steps: []
+ tempFiles: []
+ type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
+INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [{TEST_JOB_ID}]
+INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: {TEST_JOB_ID}
+INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please \
+navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/{TEST_JOB_ID}?project=XXX
+"""
+
+
 class TestDataflow(unittest.TestCase):
 
-    def test_data_flow_valid_job_id(self):
-        cmd = [
-            'echo', 'additional unit test lines.\n' +
-            'https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/'
-            'jobs/{}?project=XXX'.format(TEST_JOB_ID)
-        ]
+    @parameterized.expand([
+        (APACHE_BEAM_V_2_14_0_JAVA_SDK_LOG, ),
+        (APACHE_BEAM_V_2_22_0_JAVA_SDK_LOG, ),
+        (APACHE_BEAM_V_2_14_0_PYTHON_SDK_LOG, ),
+        (APACHE_BEAM_V_2_22_0_PYTHON_SDK_LOG, ),
+    ], name_func=lambda func, num, p: f"{func.__name__}_{num}")
+    def test_data_flow_valid_job_id(self, log):
+        echos = ";".join([f"echo {shlex.quote(line)}" for line in log.split("\n")])
+        cmd = ["bash", "-c", echos]
         self.assertEqual(_DataflowRunner(cmd).wait_for_done(), TEST_JOB_ID)
 
     def test_data_flow_missing_job_id(self):
diff --git a/tests/providers/google/cloud/operators/test_dataflow_system.py b/tests/providers/google/cloud/operators/test_dataflow_system.py
index 3a42cb1..9308d55 100644
--- a/tests/providers/google/cloud/operators/test_dataflow_system.py
+++ b/tests/providers/google/cloud/operators/test_dataflow_system.py
@@ -25,5 +25,13 @@ from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTe
 @pytest.mark.credential_file(GCP_DATAFLOW_KEY)
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_DATAFLOW_KEY)
-    def test_run_example_dag_function(self):
-        self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+    def test_run_example_gcp_dataflow_native_java(self):
+        self.run_dag('example_gcp_dataflow_native_java', CLOUD_DAG_FOLDER)
+
+    @provide_gcp_context(GCP_DATAFLOW_KEY)
+    def test_run_example_gcp_dataflow_native_python(self):
+        self.run_dag('example_gcp_dataflow_native_python', CLOUD_DAG_FOLDER)
+
+    @provide_gcp_context(GCP_DATAFLOW_KEY)
+    def test_run_example_gcp_dataflow_template(self):
+        self.run_dag('example_gcp_dataflow_template', CLOUD_DAG_FOLDER)