You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/01/26 19:21:21 UTC

incubator-airflow git commit: [AIRFLOW-782] Add support for DataFlowPythonOperator.

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5184c6e2f -> dc97bcd3b


[AIRFLOW-782] Add support for DataFlowPythonOperator.

DataFlowPythonOperator allows users to definie GCP
dataflow task where
the pipeline job is specified in Python. The
corresponding unit tests
are also included. To run the tests:
nosetests tests.contrib.hooks.gcp_dataflow_hook:Da
taFlowHookTest and
nosetests tests.contrib.operators.dataflow_operato
r:DataFlowPythonOperatorTest.

Closes #2025 from fenglu-g/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dc97bcd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dc97bcd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dc97bcd3

Branch: refs/heads/master
Commit: dc97bcd3b7e0a7eebd838f0fb0452a0b47ba417b
Parents: 5184c6e
Author: Feng Lu <fe...@fengcloud.hot.corp.google.com>
Authored: Thu Jan 26 11:20:52 2017 -0800
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Thu Jan 26 11:20:52 2017 -0800

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataflow_hook.py     | 33 +++++---
 airflow/contrib/operators/dataflow_operator.py | 85 +++++++++++++++++++--
 tests/contrib/hooks/gcp_dataflow_hook.py       | 56 ++++++++++++++
 tests/contrib/operators/dataflow_operator.py   | 76 ++++++++++++++++++
 4 files changed, 232 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc97bcd3/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index bd5bd3c..aaa9992 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -24,6 +24,7 @@ from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
 
 class _DataflowJob(object):
+
     def __init__(self, dataflow, project_number, name):
         self._dataflow = dataflow
         self._project_number = project_number
@@ -82,7 +83,8 @@ class _DataflowJob(object):
         return self._job
 
 
-class _DataflowJava(object):
+class _Dataflow(object):
+
     def __init__(self, cmd):
         self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
                                       stderr=subprocess.PIPE)
@@ -113,11 +115,12 @@ class _DataflowJava(object):
             else:
                 logging.info("Waiting for DataFlow process to complete.")
         if self._proc.returncode is not 0:
-            raise Exception("DataFlow jar failed with return code {}".format(
+            raise Exception("DataFlow failed with return code {}".format(
                 self._proc.returncode))
 
 
 class DataFlowHook(GoogleCloudBaseHook):
+
     def __init__(self,
                  gcp_conn_id='google_cloud_default',
                  delegate_to=None):
@@ -130,21 +133,27 @@ class DataFlowHook(GoogleCloudBaseHook):
         http_authorized = self._authorize()
         return build('dataflow', 'v1b3', http=http_authorized)
 
+    def _start_dataflow(self, task_id, variables, dataflow, name, command_prefix):
+        cmd = command_prefix + self._build_cmd(task_id, variables, dataflow)
+        _Dataflow(cmd).wait_for_done()
+        _DataflowJob(
+            self.get_conn(), variables['project'], name).wait_for_done()
+
     def start_java_dataflow(self, task_id, variables, dataflow):
         name = task_id + "-" + str(uuid.uuid1())[:8]
-        cmd = self._build_cmd(task_id, variables, dataflow, name)
-        _DataflowJava(cmd).wait_for_done()
-        _DataflowJob(self.get_conn(), variables['project'], name).wait_for_done()
+        variables['jobName'] = name
+        self._start_dataflow(
+            task_id, variables, dataflow, name, ["java", "-jar"])
 
-    def _build_cmd(self, task_id, variables, dataflow, name):
-        command = ["java", "-jar",
-                   dataflow,
-                   "--runner=DataflowPipelineRunner",
-                   "--streaming=false",
-                   "--jobName=" + name]
+    def start_python_dataflow(self, task_id, variables, dataflow, py_options):
+        name = task_id + "-" + str(uuid.uuid1())[:8]
+        variables["job_name"] = name
+        self._start_dataflow(
+            task_id, variables, dataflow, name, ["python"] + py_options)
 
+    def _build_cmd(self, task_id, variables, dataflow):
+        command = [dataflow, "--runner=DataflowPipelineRunner"]
         if variables is not None:
             for attr, value in variables.iteritems():
                 command.append("--" + attr + "=" + value)
-
         return command

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc97bcd3/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index 10a6811..ef49eb6 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import copy
+import re
 
 from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
 from airflow.models import BaseOperator
@@ -70,9 +71,13 @@ class DataFlowJavaOperator(BaseOperator):
             *args,
             **kwargs):
         """
-        Create a new DataFlowJavaOperator.
+        Create a new DataFlowJavaOperator. Note that both
+        dataflow_default_options and options will be merged to specify pipeline
+        execution parameter, and dataflow_default_options is expected to save
+        high-level options, for instances, project and zone information, which
+        apply to all dataflow operators in the DAG.
 
-        For more detail on about job submission have a look at the reference:
+        For more detail on job submission have a look at the reference:
 
         https://cloud.google.com/dataflow/pipelines/specifying-exec-params
 
@@ -82,11 +87,12 @@ class DataFlowJavaOperator(BaseOperator):
         :type dataflow_default_options: dict
         :param options: Map of job specific options.
         :type options: dict
-        :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+        :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
         :type gcp_conn_id: string
         :param delegate_to: The account to impersonate, if any.
-            For this to work, the service account making the request must have domain-wide
-            delegation enabled.
+            For this to work, the service account making the request must have
+            domain-wide delegation enabled.
         :type delegate_to: string
         """
         super(DataFlowJavaOperator, self).__init__(*args, **kwargs)
@@ -101,9 +107,76 @@ class DataFlowJavaOperator(BaseOperator):
         self.options = options
 
     def execute(self, context):
-        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
+        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
 
         dataflow_options = copy.copy(self.dataflow_default_options)
         dataflow_options.update(self.options)
 
         hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
+
+
+class DataFlowPythonOperator(BaseOperator):
+
+    @apply_defaults
+    def __init__(
+            self,
+            py_file,
+            py_options=None,
+            dataflow_default_options=None,
+            options=None,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new DataFlowPythonOperator. Note that both
+        dataflow_default_options and options will be merged to specify pipeline
+        execution parameter, and dataflow_default_options is expected to save
+        high-level options, for instances, project and zone information, which
+        apply to all dataflow operators in the DAG.
+
+        For more detail on job submission have a look at the reference:
+
+        https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+
+        :param py_file: Reference to the python dataflow pipleline file, e.g.,
+            /some/local/file/path/to/your/python/pipeline/file.py.
+        :type py_file: string
+        :param py_options: Additional python options.
+        :type pyt_options: list of strings, e.g., ["-m", "-v"].
+        :param dataflow_default_options: Map of default job options.
+        :type dataflow_default_options: dict
+        :param options: Map of job specific options.
+        :type options: dict
+        :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+            Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have
+            domain-wide  delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataFlowPythonOperator, self).__init__(*args, **kwargs)
+
+        self.py_file = py_file
+        self.py_options = py_options or []
+        self.dataflow_default_options = dataflow_default_options or {}
+        self.options = options or {}
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        """Execute the python dataflow job."""
+        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
+        dataflow_options = self.dataflow_default_options.copy()
+        dataflow_options.update(self.options)
+        # Convert argument names from lowerCamelCase to snake case.
+        camel_to_snake = lambda name: re.sub(
+            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
+        formatted_options = {camel_to_snake(key): dataflow_options[key]
+                             for key in dataflow_options}
+        hook.start_python_dataflow(
+            self.task_id, formatted_options,
+            self.py_file, self.py_options)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc97bcd3/tests/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/gcp_dataflow_hook.py b/tests/contrib/hooks/gcp_dataflow_hook.py
new file mode 100644
index 0000000..797d40c
--- /dev/null
+++ b/tests/contrib/hooks/gcp_dataflow_hook.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+TASK_ID = 'test-python-dataflow'
+PY_FILE = 'apache_beam.examples.wordcount'
+PY_OPTIONS = ['-m']
+OPTIONS = {
+    'project': 'test',
+    'staging_location': 'gs://test/staging'
+}
+BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
+DATAFLOW_STRING = 'airflow.contrib.hooks.gcp_dataflow_hook.{}'
+
+
+def mock_init(self, gcp_conn_id, delegate_to=None):
+    pass
+
+
+class DataFlowHookTest(unittest.TestCase):
+
+    def setUp(self):
+        with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
+                        new=mock_init):
+            self.dataflow_hook = DataFlowHook(gcp_conn_id='test')
+
+    @mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_dataflow'))
+    def test_start_python_dataflow(self, internal_dataflow_mock):
+        self.dataflow_hook.start_python_dataflow(
+            task_id=TASK_ID, variables=OPTIONS,
+            dataflow=PY_FILE, py_options=PY_OPTIONS)
+        internal_dataflow_mock.assert_called_once_with(
+            TASK_ID, OPTIONS, PY_FILE, mock.ANY, ['python'] + PY_OPTIONS)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc97bcd3/tests/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py
new file mode 100644
index 0000000..4f887c1
--- /dev/null
+++ b/tests/contrib/operators/dataflow_operator.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+TASK_ID = 'test-python-dataflow'
+PY_FILE = 'apache_beam.examples.wordcount'
+PY_OPTIONS = ['-m']
+DEFAULT_OPTIONS = {
+    'project': 'test',
+    'stagingLocation': 'gs://test/staging'
+}
+ADDITIONAL_OPTIONS = {
+    'output': 'gs://test/output'
+}
+
+
+class DataFlowPythonOperatorTest(unittest.TestCase):
+
+    def setUp(self):
+        self.dataflow = DataFlowPythonOperator(
+            task_id=TASK_ID,
+            py_file=PY_FILE,
+            py_options=PY_OPTIONS,
+            dataflow_default_options=DEFAULT_OPTIONS,
+            options=ADDITIONAL_OPTIONS)
+
+    def test_init(self):
+        """Test DataFlowPythonOperator instance is properly initialized."""
+        self.assertEqual(self.dataflow.task_id, TASK_ID)
+        self.assertEqual(self.dataflow.py_file, PY_FILE)
+        self.assertEqual(self.dataflow.py_options, PY_OPTIONS)
+        self.assertEqual(self.dataflow.dataflow_default_options,
+                         DEFAULT_OPTIONS)
+        self.assertEqual(self.dataflow.options,
+                         ADDITIONAL_OPTIONS)
+
+    @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook')
+    def test_exec(self, dataflow_mock):
+        """Test DataFlowHook is created and the right args are passed to
+        start_python_workflow.
+
+        """
+        start_python_hook = dataflow_mock.return_value.start_python_dataflow
+        self.dataflow.execute(None)
+        assert dataflow_mock.called
+        expected_options = {
+            'project': 'test',
+            'staging_location': 'gs://test/staging',
+            'output': 'gs://test/output'
+        }
+        start_python_hook.assert_called_once_with(TASK_ID, expected_options,
+                                                  PY_FILE, PY_OPTIONS)