You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 04:45:07 UTC

[09/45] incubator-airflow git commit: Revert "Revert "[AIRFLOW-782] Add support for DataFlowPythonOperator.""

Revert "Revert "[AIRFLOW-782] Add support for DataFlowPythonOperator.""

This reverts commit 7e65998a1bedd00e74fa333cfee78ad574aaa849.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eddecd59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eddecd59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eddecd59

Branch: refs/heads/v1-8-stable
Commit: eddecd59d73191904f2f156e53a138e532dc560a
Parents: 8aacc28
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sun Feb 12 13:10:33 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 12 13:10:33 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataflow_hook.py     | 33 +++++---
 airflow/contrib/operators/dataflow_operator.py | 85 +++++++++++++++++++--
 tests/contrib/hooks/gcp_dataflow_hook.py       | 56 ++++++++++++++
 tests/contrib/operators/dataflow_operator.py   | 76 ++++++++++++++++++
 4 files changed, 232 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index bd5bd3c..aaa9992 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -24,6 +24,7 @@ from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
 
 class _DataflowJob(object):
+
     def __init__(self, dataflow, project_number, name):
         self._dataflow = dataflow
         self._project_number = project_number
@@ -82,7 +83,8 @@ class _DataflowJob(object):
         return self._job
 
 
-class _DataflowJava(object):
+class _Dataflow(object):
+
     def __init__(self, cmd):
         self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
                                       stderr=subprocess.PIPE)
@@ -113,11 +115,12 @@ class _DataflowJava(object):
             else:
                 logging.info("Waiting for DataFlow process to complete.")
         if self._proc.returncode is not 0:
-            raise Exception("DataFlow jar failed with return code {}".format(
+            raise Exception("DataFlow failed with return code {}".format(
                 self._proc.returncode))
 
 
 class DataFlowHook(GoogleCloudBaseHook):
+
     def __init__(self,
                  gcp_conn_id='google_cloud_default',
                  delegate_to=None):
@@ -130,21 +133,27 @@ class DataFlowHook(GoogleCloudBaseHook):
         http_authorized = self._authorize()
         return build('dataflow', 'v1b3', http=http_authorized)
 
+    def _start_dataflow(self, task_id, variables, dataflow, name, command_prefix):
+        cmd = command_prefix + self._build_cmd(task_id, variables, dataflow)
+        _Dataflow(cmd).wait_for_done()
+        _DataflowJob(
+            self.get_conn(), variables['project'], name).wait_for_done()
+
     def start_java_dataflow(self, task_id, variables, dataflow):
         name = task_id + "-" + str(uuid.uuid1())[:8]
-        cmd = self._build_cmd(task_id, variables, dataflow, name)
-        _DataflowJava(cmd).wait_for_done()
-        _DataflowJob(self.get_conn(), variables['project'], name).wait_for_done()
+        variables['jobName'] = name
+        self._start_dataflow(
+            task_id, variables, dataflow, name, ["java", "-jar"])
 
-    def _build_cmd(self, task_id, variables, dataflow, name):
-        command = ["java", "-jar",
-                   dataflow,
-                   "--runner=DataflowPipelineRunner",
-                   "--streaming=false",
-                   "--jobName=" + name]
+    def start_python_dataflow(self, task_id, variables, dataflow, py_options):
+        name = task_id + "-" + str(uuid.uuid1())[:8]
+        variables["job_name"] = name
+        self._start_dataflow(
+            task_id, variables, dataflow, name, ["python"] + py_options)
 
+    def _build_cmd(self, task_id, variables, dataflow):
+        command = [dataflow, "--runner=DataflowPipelineRunner"]
         if variables is not None:
             for attr, value in variables.iteritems():
                 command.append("--" + attr + "=" + value)
-
         return command

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index 10a6811..ef49eb6 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import copy
+import re
 
 from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
 from airflow.models import BaseOperator
@@ -70,9 +71,13 @@ class DataFlowJavaOperator(BaseOperator):
             *args,
             **kwargs):
         """
-        Create a new DataFlowJavaOperator.
+        Create a new DataFlowJavaOperator. Note that both
+        dataflow_default_options and options will be merged to specify pipeline
+        execution parameter, and dataflow_default_options is expected to save
+        high-level options, for instances, project and zone information, which
+        apply to all dataflow operators in the DAG.
 
-        For more detail on about job submission have a look at the reference:
+        For more detail on job submission have a look at the reference:
 
         https://cloud.google.com/dataflow/pipelines/specifying-exec-params
 
@@ -82,11 +87,12 @@ class DataFlowJavaOperator(BaseOperator):
         :type dataflow_default_options: dict
         :param options: Map of job specific options.
         :type options: dict
-        :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+        :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
         :type gcp_conn_id: string
         :param delegate_to: The account to impersonate, if any.
-            For this to work, the service account making the request must have domain-wide
-            delegation enabled.
+            For this to work, the service account making the request must have
+            domain-wide delegation enabled.
         :type delegate_to: string
         """
         super(DataFlowJavaOperator, self).__init__(*args, **kwargs)
@@ -101,9 +107,76 @@ class DataFlowJavaOperator(BaseOperator):
         self.options = options
 
     def execute(self, context):
-        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
+        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
 
         dataflow_options = copy.copy(self.dataflow_default_options)
         dataflow_options.update(self.options)
 
         hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
+
+
+class DataFlowPythonOperator(BaseOperator):
+
+    @apply_defaults
+    def __init__(
+            self,
+            py_file,
+            py_options=None,
+            dataflow_default_options=None,
+            options=None,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new DataFlowPythonOperator. Note that both
+        dataflow_default_options and options will be merged to specify pipeline
+        execution parameter, and dataflow_default_options is expected to save
+        high-level options, for instances, project and zone information, which
+        apply to all dataflow operators in the DAG.
+
+        For more detail on job submission have a look at the reference:
+
+        https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+
+        :param py_file: Reference to the python dataflow pipleline file, e.g.,
+            /some/local/file/path/to/your/python/pipeline/file.py.
+        :type py_file: string
+        :param py_options: Additional python options.
+        :type pyt_options: list of strings, e.g., ["-m", "-v"].
+        :param dataflow_default_options: Map of default job options.
+        :type dataflow_default_options: dict
+        :param options: Map of job specific options.
+        :type options: dict
+        :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+            Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have
+            domain-wide  delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataFlowPythonOperator, self).__init__(*args, **kwargs)
+
+        self.py_file = py_file
+        self.py_options = py_options or []
+        self.dataflow_default_options = dataflow_default_options or {}
+        self.options = options or {}
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        """Execute the python dataflow job."""
+        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to)
+        dataflow_options = self.dataflow_default_options.copy()
+        dataflow_options.update(self.options)
+        # Convert argument names from lowerCamelCase to snake case.
+        camel_to_snake = lambda name: re.sub(
+            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
+        formatted_options = {camel_to_snake(key): dataflow_options[key]
+                             for key in dataflow_options}
+        hook.start_python_dataflow(
+            self.task_id, formatted_options,
+            self.py_file, self.py_options)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/tests/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/gcp_dataflow_hook.py b/tests/contrib/hooks/gcp_dataflow_hook.py
new file mode 100644
index 0000000..797d40c
--- /dev/null
+++ b/tests/contrib/hooks/gcp_dataflow_hook.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+TASK_ID = 'test-python-dataflow'
+PY_FILE = 'apache_beam.examples.wordcount'
+PY_OPTIONS = ['-m']
+OPTIONS = {
+    'project': 'test',
+    'staging_location': 'gs://test/staging'
+}
+BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
+DATAFLOW_STRING = 'airflow.contrib.hooks.gcp_dataflow_hook.{}'
+
+
+def mock_init(self, gcp_conn_id, delegate_to=None):
+    pass
+
+
+class DataFlowHookTest(unittest.TestCase):
+
+    def setUp(self):
+        with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
+                        new=mock_init):
+            self.dataflow_hook = DataFlowHook(gcp_conn_id='test')
+
+    @mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_dataflow'))
+    def test_start_python_dataflow(self, internal_dataflow_mock):
+        self.dataflow_hook.start_python_dataflow(
+            task_id=TASK_ID, variables=OPTIONS,
+            dataflow=PY_FILE, py_options=PY_OPTIONS)
+        internal_dataflow_mock.assert_called_once_with(
+            TASK_ID, OPTIONS, PY_FILE, mock.ANY, ['python'] + PY_OPTIONS)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/tests/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py
new file mode 100644
index 0000000..4f887c1
--- /dev/null
+++ b/tests/contrib/operators/dataflow_operator.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+TASK_ID = 'test-python-dataflow'
+PY_FILE = 'apache_beam.examples.wordcount'
+PY_OPTIONS = ['-m']
+DEFAULT_OPTIONS = {
+    'project': 'test',
+    'stagingLocation': 'gs://test/staging'
+}
+ADDITIONAL_OPTIONS = {
+    'output': 'gs://test/output'
+}
+
+
+class DataFlowPythonOperatorTest(unittest.TestCase):
+
+    def setUp(self):
+        self.dataflow = DataFlowPythonOperator(
+            task_id=TASK_ID,
+            py_file=PY_FILE,
+            py_options=PY_OPTIONS,
+            dataflow_default_options=DEFAULT_OPTIONS,
+            options=ADDITIONAL_OPTIONS)
+
+    def test_init(self):
+        """Test DataFlowPythonOperator instance is properly initialized."""
+        self.assertEqual(self.dataflow.task_id, TASK_ID)
+        self.assertEqual(self.dataflow.py_file, PY_FILE)
+        self.assertEqual(self.dataflow.py_options, PY_OPTIONS)
+        self.assertEqual(self.dataflow.dataflow_default_options,
+                         DEFAULT_OPTIONS)
+        self.assertEqual(self.dataflow.options,
+                         ADDITIONAL_OPTIONS)
+
+    @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook')
+    def test_exec(self, dataflow_mock):
+        """Test DataFlowHook is created and the right args are passed to
+        start_python_workflow.
+
+        """
+        start_python_hook = dataflow_mock.return_value.start_python_dataflow
+        self.dataflow.execute(None)
+        assert dataflow_mock.called
+        expected_options = {
+            'project': 'test',
+            'staging_location': 'gs://test/staging',
+            'output': 'gs://test/output'
+        }
+        start_python_hook.assert_called_once_with(TASK_ID, expected_options,
+                                                  PY_FILE, PY_OPTIONS)