You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/01 15:56:30 UTC
incubator-airflow git commit: Revert "[AIRFLOW-782] Add support for
DataFlowPythonOperator."
Repository: incubator-airflow
Updated Branches:
refs/heads/v1-8-test 77715b9e7 -> 7e65998a1
Revert "[AIRFLOW-782] Add support for DataFlowPythonOperator."
This reverts commit dc97bcd3b7e0a7eebd838f0fb0452a0b47ba417b.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7e65998a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7e65998a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7e65998a
Branch: refs/heads/v1-8-test
Commit: 7e65998a1bedd00e74fa333cfee78ad574aaa849
Parents: 77715b9
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Feb 1 15:56:14 2017 +0000
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Feb 1 15:56:14 2017 +0000
----------------------------------------------------------------------
airflow/contrib/hooks/gcp_dataflow_hook.py | 33 +++-----
airflow/contrib/operators/dataflow_operator.py | 85 ++-------------------
tests/contrib/hooks/gcp_dataflow_hook.py | 56 --------------
tests/contrib/operators/dataflow_operator.py | 76 ------------------
4 files changed, 18 insertions(+), 232 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index aaa9992..bd5bd3c 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -24,7 +24,6 @@ from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class _DataflowJob(object):
-
def __init__(self, dataflow, project_number, name):
self._dataflow = dataflow
self._project_number = project_number
@@ -83,8 +82,7 @@ class _DataflowJob(object):
return self._job
-class _Dataflow(object):
-
+class _DataflowJava(object):
def __init__(self, cmd):
self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -115,12 +113,11 @@ class _Dataflow(object):
else:
logging.info("Waiting for DataFlow process to complete.")
if self._proc.returncode is not 0:
- raise Exception("DataFlow failed with return code {}".format(
+ raise Exception("DataFlow jar failed with return code {}".format(
self._proc.returncode))
class DataFlowHook(GoogleCloudBaseHook):
-
def __init__(self,
gcp_conn_id='google_cloud_default',
delegate_to=None):
@@ -133,27 +130,21 @@ class DataFlowHook(GoogleCloudBaseHook):
http_authorized = self._authorize()
return build('dataflow', 'v1b3', http=http_authorized)
- def _start_dataflow(self, task_id, variables, dataflow, name, command_prefix):
- cmd = command_prefix + self._build_cmd(task_id, variables, dataflow)
- _Dataflow(cmd).wait_for_done()
- _DataflowJob(
- self.get_conn(), variables['project'], name).wait_for_done()
-
def start_java_dataflow(self, task_id, variables, dataflow):
name = task_id + "-" + str(uuid.uuid1())[:8]
- variables['jobName'] = name
- self._start_dataflow(
- task_id, variables, dataflow, name, ["java", "-jar"])
+ cmd = self._build_cmd(task_id, variables, dataflow, name)
+ _DataflowJava(cmd).wait_for_done()
+ _DataflowJob(self.get_conn(), variables['project'], name).wait_for_done()
- def start_python_dataflow(self, task_id, variables, dataflow, py_options):
- name = task_id + "-" + str(uuid.uuid1())[:8]
- variables["job_name"] = name
- self._start_dataflow(
- task_id, variables, dataflow, name, ["python"] + py_options)
+ def _build_cmd(self, task_id, variables, dataflow, name):
+ command = ["java", "-jar",
+ dataflow,
+ "--runner=DataflowPipelineRunner",
+ "--streaming=false",
+ "--jobName=" + name]
- def _build_cmd(self, task_id, variables, dataflow):
- command = [dataflow, "--runner=DataflowPipelineRunner"]
if variables is not None:
for attr, value in variables.iteritems():
command.append("--" + attr + "=" + value)
+
return command
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index ef49eb6..10a6811 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -13,7 +13,6 @@
# limitations under the License.
import copy
-import re
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
from airflow.models import BaseOperator
@@ -71,13 +70,9 @@ class DataFlowJavaOperator(BaseOperator):
*args,
**kwargs):
"""
- Create a new DataFlowJavaOperator. Note that both
- dataflow_default_options and options will be merged to specify pipeline
- execution parameter, and dataflow_default_options is expected to save
- high-level options, for instances, project and zone information, which
- apply to all dataflow operators in the DAG.
+ Create a new DataFlowJavaOperator.
- For more detail on job submission have a look at the reference:
+ For more detail on about job submission have a look at the reference:
https://cloud.google.com/dataflow/pipelines/specifying-exec-params
@@ -87,12 +82,11 @@ class DataFlowJavaOperator(BaseOperator):
:type dataflow_default_options: dict
:param options: Map of job specific options.
:type options: dict
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud
- Platform.
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:type gcp_conn_id: string
:param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
- domain-wide delegation enabled.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
:type delegate_to: string
"""
super(DataFlowJavaOperator, self).__init__(*args, **kwargs)
@@ -107,76 +101,9 @@ class DataFlowJavaOperator(BaseOperator):
self.options = options
def execute(self, context):
- hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to)
+ hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
dataflow_options = copy.copy(self.dataflow_default_options)
dataflow_options.update(self.options)
hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
-
-
-class DataFlowPythonOperator(BaseOperator):
-
- @apply_defaults
- def __init__(
- self,
- py_file,
- py_options=None,
- dataflow_default_options=None,
- options=None,
- gcp_conn_id='google_cloud_default',
- delegate_to=None,
- *args,
- **kwargs):
- """
- Create a new DataFlowPythonOperator. Note that both
- dataflow_default_options and options will be merged to specify pipeline
- execution parameter, and dataflow_default_options is expected to save
- high-level options, for instances, project and zone information, which
- apply to all dataflow operators in the DAG.
-
- For more detail on job submission have a look at the reference:
-
- https://cloud.google.com/dataflow/pipelines/specifying-exec-params
-
- :param py_file: Reference to the python dataflow pipleline file, e.g.,
- /some/local/file/path/to/your/python/pipeline/file.py.
- :type py_file: string
- :param py_options: Additional python options.
- :type pyt_options: list of strings, e.g., ["-m", "-v"].
- :param dataflow_default_options: Map of default job options.
- :type dataflow_default_options: dict
- :param options: Map of job specific options.
- :type options: dict
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud
- Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
- domain-wide delegation enabled.
- :type delegate_to: string
- """
- super(DataFlowPythonOperator, self).__init__(*args, **kwargs)
-
- self.py_file = py_file
- self.py_options = py_options or []
- self.dataflow_default_options = dataflow_default_options or {}
- self.options = options or {}
- self.gcp_conn_id = gcp_conn_id
- self.delegate_to = delegate_to
-
- def execute(self, context):
- """Execute the python dataflow job."""
- hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to)
- dataflow_options = self.dataflow_default_options.copy()
- dataflow_options.update(self.options)
- # Convert argument names from lowerCamelCase to snake case.
- camel_to_snake = lambda name: re.sub(
- r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
- formatted_options = {camel_to_snake(key): dataflow_options[key]
- for key in dataflow_options}
- hook.start_python_dataflow(
- self.task_id, formatted_options,
- self.py_file, self.py_options)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/tests/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/gcp_dataflow_hook.py b/tests/contrib/hooks/gcp_dataflow_hook.py
deleted file mode 100644
index 797d40c..0000000
--- a/tests/contrib/hooks/gcp_dataflow_hook.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import unittest
-from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
-
-try:
- from unittest import mock
-except ImportError:
- try:
- import mock
- except ImportError:
- mock = None
-
-
-TASK_ID = 'test-python-dataflow'
-PY_FILE = 'apache_beam.examples.wordcount'
-PY_OPTIONS = ['-m']
-OPTIONS = {
- 'project': 'test',
- 'staging_location': 'gs://test/staging'
-}
-BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
-DATAFLOW_STRING = 'airflow.contrib.hooks.gcp_dataflow_hook.{}'
-
-
-def mock_init(self, gcp_conn_id, delegate_to=None):
- pass
-
-
-class DataFlowHookTest(unittest.TestCase):
-
- def setUp(self):
- with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
- new=mock_init):
- self.dataflow_hook = DataFlowHook(gcp_conn_id='test')
-
- @mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_dataflow'))
- def test_start_python_dataflow(self, internal_dataflow_mock):
- self.dataflow_hook.start_python_dataflow(
- task_id=TASK_ID, variables=OPTIONS,
- dataflow=PY_FILE, py_options=PY_OPTIONS)
- internal_dataflow_mock.assert_called_once_with(
- TASK_ID, OPTIONS, PY_FILE, mock.ANY, ['python'] + PY_OPTIONS)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e65998a/tests/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py
deleted file mode 100644
index 4f887c1..0000000
--- a/tests/contrib/operators/dataflow_operator.py
+++ /dev/null
@@ -1,76 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import unittest
-
-from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
-
-try:
- from unittest import mock
-except ImportError:
- try:
- import mock
- except ImportError:
- mock = None
-
-
-TASK_ID = 'test-python-dataflow'
-PY_FILE = 'apache_beam.examples.wordcount'
-PY_OPTIONS = ['-m']
-DEFAULT_OPTIONS = {
- 'project': 'test',
- 'stagingLocation': 'gs://test/staging'
-}
-ADDITIONAL_OPTIONS = {
- 'output': 'gs://test/output'
-}
-
-
-class DataFlowPythonOperatorTest(unittest.TestCase):
-
- def setUp(self):
- self.dataflow = DataFlowPythonOperator(
- task_id=TASK_ID,
- py_file=PY_FILE,
- py_options=PY_OPTIONS,
- dataflow_default_options=DEFAULT_OPTIONS,
- options=ADDITIONAL_OPTIONS)
-
- def test_init(self):
- """Test DataFlowPythonOperator instance is properly initialized."""
- self.assertEqual(self.dataflow.task_id, TASK_ID)
- self.assertEqual(self.dataflow.py_file, PY_FILE)
- self.assertEqual(self.dataflow.py_options, PY_OPTIONS)
- self.assertEqual(self.dataflow.dataflow_default_options,
- DEFAULT_OPTIONS)
- self.assertEqual(self.dataflow.options,
- ADDITIONAL_OPTIONS)
-
- @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook')
- def test_exec(self, dataflow_mock):
- """Test DataFlowHook is created and the right args are passed to
- start_python_workflow.
-
- """
- start_python_hook = dataflow_mock.return_value.start_python_dataflow
- self.dataflow.execute(None)
- assert dataflow_mock.called
- expected_options = {
- 'project': 'test',
- 'staging_location': 'gs://test/staging',
- 'output': 'gs://test/output'
- }
- start_python_hook.assert_called_once_with(TASK_ID, expected_options,
- PY_FILE, PY_OPTIONS)