You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/01 15:56:30 UTC

incubator-airflow git commit: Revert "[AIRFLOW-782] Add support for DataFlowPythonOperator."

Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 77715b9e7 -> 7e65998a1


Revert "[AIRFLOW-782] Add support for DataFlowPythonOperator."

This reverts commit dc97bcd3b7e0a7eebd838f0fb0452a0b47ba417b.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7e65998a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7e65998a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7e65998a

Branch: refs/heads/v1-8-test
Commit: 7e65998a1bedd00e74fa333cfee78ad574aaa849
Parents: 77715b9
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Feb 1 15:56:14 2017 +0000
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Feb 1 15:56:14 2017 +0000

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataflow_hook.py     | 33 +++-----
 airflow/contrib/operators/dataflow_operator.py | 85 ++-------------------
 tests/contrib/hooks/gcp_dataflow_hook.py       | 56 --------------
 tests/contrib/operators/dataflow_operator.py   | 76 ------------------
 4 files changed, 18 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index aaa9992..bd5bd3c 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -24,7 +24,6 @@ from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
 
 class _DataflowJob(object):
-
     def __init__(self, dataflow, project_number, name):
         self._dataflow = dataflow
         self._project_number = project_number
@@ -83,8 +82,7 @@ class _DataflowJob(object):
         return self._job
 
 
-class _Dataflow(object):
-
+class _DataflowJava(object):
     def __init__(self, cmd):
         self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
                                       stderr=subprocess.PIPE)
@@ -115,12 +113,11 @@ class _Dataflow(object):
             else:
                 logging.info("Waiting for DataFlow process to complete.")
         if self._proc.returncode is not 0:
-            raise Exception("DataFlow failed with return code {}".format(
+            raise Exception("DataFlow jar failed with return code {}".format(
                 self._proc.returncode))
 
 
 class DataFlowHook(GoogleCloudBaseHook):
-
     def __init__(self,
                  gcp_conn_id='google_cloud_default',
                  delegate_to=None):
@@ -133,27 +130,21 @@ class DataFlowHook(GoogleCloudBaseHook):
         http_authorized = self._authorize()
         return build('dataflow', 'v1b3', http=http_authorized)
 
-    def _start_dataflow(self, task_id, variables, dataflow, name, command_prefix):
-        cmd = command_prefix + self._build_cmd(task_id, variables, dataflow)
-        _Dataflow(cmd).wait_for_done()
-        _DataflowJob(
-            self.get_conn(), variables['project'], name).wait_for_done()
-
     def start_java_dataflow(self, task_id, variables, dataflow):
         name = task_id + "-" + str(uuid.uuid1())[:8]
-        variables['jobName'] = name
-        self._start_dataflow(
-            task_id, variables, dataflow, name, ["java", "-jar"])
+        cmd = self._build_cmd(task_id, variables, dataflow, name)
+        _DataflowJava(cmd).wait_for_done()
+        _DataflowJob(self.get_conn(), variables['project'], name).wait_for_done()
 
-    def start_python_dataflow(self, task_id, variables, dataflow, py_options):
-        name = task_id + "-" + str(uuid.uuid1())[:8]
-        variables["job_name"] = name
-        self._start_dataflow(
-            task_id, variables, dataflow, name, ["python"] + py_options)
+    def _build_cmd(self, task_id, variables, dataflow, name):
+        command = ["java", "-jar",
+                   dataflow,
+                   "--runner=DataflowPipelineRunner",
+                   "--streaming=false",
+                   "--jobName=" + name]
 
-    def _build_cmd(self, task_id, variables, dataflow):
-        command = [dataflow, "--runner=DataflowPipelineRunner"]
         if variables is not None:
             for attr, value in variables.iteritems():
                 command.append("--" + attr + "=" + value)
+
         return command

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index ef49eb6..10a6811 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 import copy
-import re
 
 from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
 from airflow.models import BaseOperator
@@ -71,13 +70,9 @@ class DataFlowJavaOperator(BaseOperator):
             *args,
             **kwargs):
         """
-        Create a new DataFlowJavaOperator. Note that both
-        dataflow_default_options and options will be merged to specify pipeline
-        execution parameter, and dataflow_default_options is expected to save
-        high-level options, for instances, project and zone information, which
-        apply to all dataflow operators in the DAG.
+        Create a new DataFlowJavaOperator.
 
-        For more detail on job submission have a look at the reference:
+        For more detail on about job submission have a look at the reference:
 
         https://cloud.google.com/dataflow/pipelines/specifying-exec-params
 
@@ -87,12 +82,11 @@ class DataFlowJavaOperator(BaseOperator):
         :type dataflow_default_options: dict
         :param options: Map of job specific options.
         :type options: dict
-        :param gcp_conn_id: The connection ID to use connecting to Google Cloud
-        Platform.
+        :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
         :type gcp_conn_id: string
         :param delegate_to: The account to impersonate, if any.
-            For this to work, the service account making the request must have
-            domain-wide delegation enabled.
+            For this to work, the service account making the request must have domain-wide
+            delegation enabled.
         :type delegate_to: string
         """
         super(DataFlowJavaOperator, self).__init__(*args, **kwargs)
@@ -107,76 +101,9 @@ class DataFlowJavaOperator(BaseOperator):
         self.options = options
 
     def execute(self, context):
-        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
-                            delegate_to=self.delegate_to)
+        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
 
         dataflow_options = copy.copy(self.dataflow_default_options)
         dataflow_options.update(self.options)
 
         hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
-
-
-class DataFlowPythonOperator(BaseOperator):
-
-    @apply_defaults
-    def __init__(
-            self,
-            py_file,
-            py_options=None,
-            dataflow_default_options=None,
-            options=None,
-            gcp_conn_id='google_cloud_default',
-            delegate_to=None,
-            *args,
-            **kwargs):
-        """
-        Create a new DataFlowPythonOperator. Note that both
-        dataflow_default_options and options will be merged to specify pipeline
-        execution parameter, and dataflow_default_options is expected to save
-        high-level options, for instances, project and zone information, which
-        apply to all dataflow operators in the DAG.
-
-        For more detail on job submission have a look at the reference:
-
-        https://cloud.google.com/dataflow/pipelines/specifying-exec-params
-
-        :param py_file: Reference to the python dataflow pipleline file, e.g.,
-            /some/local/file/path/to/your/python/pipeline/file.py.
-        :type py_file: string
-        :param py_options: Additional python options.
-        :type pyt_options: list of strings, e.g., ["-m", "-v"].
-        :param dataflow_default_options: Map of default job options.
-        :type dataflow_default_options: dict
-        :param options: Map of job specific options.
-        :type options: dict
-        :param gcp_conn_id: The connection ID to use connecting to Google Cloud
-            Platform.
-        :type gcp_conn_id: string
-        :param delegate_to: The account to impersonate, if any.
-            For this to work, the service account making the request must have
-            domain-wide  delegation enabled.
-        :type delegate_to: string
-        """
-        super(DataFlowPythonOperator, self).__init__(*args, **kwargs)
-
-        self.py_file = py_file
-        self.py_options = py_options or []
-        self.dataflow_default_options = dataflow_default_options or {}
-        self.options = options or {}
-        self.gcp_conn_id = gcp_conn_id
-        self.delegate_to = delegate_to
-
-    def execute(self, context):
-        """Execute the python dataflow job."""
-        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
-                            delegate_to=self.delegate_to)
-        dataflow_options = self.dataflow_default_options.copy()
-        dataflow_options.update(self.options)
-        # Convert argument names from lowerCamelCase to snake case.
-        camel_to_snake = lambda name: re.sub(
-            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
-        formatted_options = {camel_to_snake(key): dataflow_options[key]
-                             for key in dataflow_options}
-        hook.start_python_dataflow(
-            self.task_id, formatted_options,
-            self.py_file, self.py_options)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/tests/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/gcp_dataflow_hook.py b/tests/contrib/hooks/gcp_dataflow_hook.py
deleted file mode 100644
index 797d40c..0000000
--- a/tests/contrib/hooks/gcp_dataflow_hook.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import unittest
-from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
-
-try:
-    from unittest import mock
-except ImportError:
-    try:
-        import mock
-    except ImportError:
-        mock = None
-
-
-TASK_ID = 'test-python-dataflow'
-PY_FILE = 'apache_beam.examples.wordcount'
-PY_OPTIONS = ['-m']
-OPTIONS = {
-    'project': 'test',
-    'staging_location': 'gs://test/staging'
-}
-BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
-DATAFLOW_STRING = 'airflow.contrib.hooks.gcp_dataflow_hook.{}'
-
-
-def mock_init(self, gcp_conn_id, delegate_to=None):
-    pass
-
-
-class DataFlowHookTest(unittest.TestCase):
-
-    def setUp(self):
-        with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
-                        new=mock_init):
-            self.dataflow_hook = DataFlowHook(gcp_conn_id='test')
-
-    @mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_dataflow'))
-    def test_start_python_dataflow(self, internal_dataflow_mock):
-        self.dataflow_hook.start_python_dataflow(
-            task_id=TASK_ID, variables=OPTIONS,
-            dataflow=PY_FILE, py_options=PY_OPTIONS)
-        internal_dataflow_mock.assert_called_once_with(
-            TASK_ID, OPTIONS, PY_FILE, mock.ANY, ['python'] + PY_OPTIONS)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/tests/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py
deleted file mode 100644
index 4f887c1..0000000
--- a/tests/contrib/operators/dataflow_operator.py
+++ /dev/null
@@ -1,76 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import unittest
-
-from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
-
-try:
-    from unittest import mock
-except ImportError:
-    try:
-        import mock
-    except ImportError:
-        mock = None
-
-
-TASK_ID = 'test-python-dataflow'
-PY_FILE = 'apache_beam.examples.wordcount'
-PY_OPTIONS = ['-m']
-DEFAULT_OPTIONS = {
-    'project': 'test',
-    'stagingLocation': 'gs://test/staging'
-}
-ADDITIONAL_OPTIONS = {
-    'output': 'gs://test/output'
-}
-
-
-class DataFlowPythonOperatorTest(unittest.TestCase):
-
-    def setUp(self):
-        self.dataflow = DataFlowPythonOperator(
-            task_id=TASK_ID,
-            py_file=PY_FILE,
-            py_options=PY_OPTIONS,
-            dataflow_default_options=DEFAULT_OPTIONS,
-            options=ADDITIONAL_OPTIONS)
-
-    def test_init(self):
-        """Test DataFlowPythonOperator instance is properly initialized."""
-        self.assertEqual(self.dataflow.task_id, TASK_ID)
-        self.assertEqual(self.dataflow.py_file, PY_FILE)
-        self.assertEqual(self.dataflow.py_options, PY_OPTIONS)
-        self.assertEqual(self.dataflow.dataflow_default_options,
-                         DEFAULT_OPTIONS)
-        self.assertEqual(self.dataflow.options,
-                         ADDITIONAL_OPTIONS)
-
-    @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook')
-    def test_exec(self, dataflow_mock):
-        """Test DataFlowHook is created and the right args are passed to
-        start_python_workflow.
-
-        """
-        start_python_hook = dataflow_mock.return_value.start_python_dataflow
-        self.dataflow.execute(None)
-        assert dataflow_mock.called
-        expected_options = {
-            'project': 'test',
-            'staging_location': 'gs://test/staging',
-            'output': 'gs://test/output'
-        }
-        start_python_hook.assert_called_once_with(TASK_ID, expected_options,
-                                                  PY_FILE, PY_OPTIONS)