You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 04:45:07 UTC
[09/45] incubator-airflow git commit: Revert "Revert "[AIRFLOW-782]
Add support for DataFlowPythonOperator.""
Revert "Revert "[AIRFLOW-782] Add support for DataFlowPythonOperator.""
This reverts commit 7e65998a1bedd00e74fa333cfee78ad574aaa849.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eddecd59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eddecd59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eddecd59
Branch: refs/heads/v1-8-stable
Commit: eddecd59d73191904f2f156e53a138e532dc560a
Parents: 8aacc28
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sun Feb 12 13:10:33 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 12 13:10:33 2017 +0100
----------------------------------------------------------------------
airflow/contrib/hooks/gcp_dataflow_hook.py | 33 +++++---
airflow/contrib/operators/dataflow_operator.py | 85 +++++++++++++++++++--
tests/contrib/hooks/gcp_dataflow_hook.py | 56 ++++++++++++++
tests/contrib/operators/dataflow_operator.py | 76 ++++++++++++++++++
4 files changed, 232 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index bd5bd3c..aaa9992 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -24,6 +24,7 @@ from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class _DataflowJob(object):
+
def __init__(self, dataflow, project_number, name):
self._dataflow = dataflow
self._project_number = project_number
@@ -82,7 +83,8 @@ class _DataflowJob(object):
return self._job
-class _DataflowJava(object):
+class _Dataflow(object):
+
def __init__(self, cmd):
self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -113,11 +115,12 @@ class _DataflowJava(object):
else:
logging.info("Waiting for DataFlow process to complete.")
if self._proc.returncode is not 0:
- raise Exception("DataFlow jar failed with return code {}".format(
+ raise Exception("DataFlow failed with return code {}".format(
self._proc.returncode))
class DataFlowHook(GoogleCloudBaseHook):
+
def __init__(self,
gcp_conn_id='google_cloud_default',
delegate_to=None):
@@ -130,21 +133,27 @@ class DataFlowHook(GoogleCloudBaseHook):
http_authorized = self._authorize()
return build('dataflow', 'v1b3', http=http_authorized)
+ def _start_dataflow(self, task_id, variables, dataflow, name, command_prefix):
+ cmd = command_prefix + self._build_cmd(task_id, variables, dataflow)
+ _Dataflow(cmd).wait_for_done()
+ _DataflowJob(
+ self.get_conn(), variables['project'], name).wait_for_done()
+
def start_java_dataflow(self, task_id, variables, dataflow):
name = task_id + "-" + str(uuid.uuid1())[:8]
- cmd = self._build_cmd(task_id, variables, dataflow, name)
- _DataflowJava(cmd).wait_for_done()
- _DataflowJob(self.get_conn(), variables['project'], name).wait_for_done()
+ variables['jobName'] = name
+ self._start_dataflow(
+ task_id, variables, dataflow, name, ["java", "-jar"])
- def _build_cmd(self, task_id, variables, dataflow, name):
- command = ["java", "-jar",
- dataflow,
- "--runner=DataflowPipelineRunner",
- "--streaming=false",
- "--jobName=" + name]
+ def start_python_dataflow(self, task_id, variables, dataflow, py_options):
+ name = task_id + "-" + str(uuid.uuid1())[:8]
+ variables["job_name"] = name
+ self._start_dataflow(
+ task_id, variables, dataflow, name, ["python"] + py_options)
+ def _build_cmd(self, task_id, variables, dataflow):
+ command = [dataflow, "--runner=DataflowPipelineRunner"]
if variables is not None:
for attr, value in variables.iteritems():
command.append("--" + attr + "=" + value)
-
return command
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index 10a6811..ef49eb6 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -13,6 +13,7 @@
# limitations under the License.
import copy
+import re
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
from airflow.models import BaseOperator
@@ -70,9 +71,13 @@ class DataFlowJavaOperator(BaseOperator):
*args,
**kwargs):
"""
- Create a new DataFlowJavaOperator.
+ Create a new DataFlowJavaOperator. Note that both
+ dataflow_default_options and options will be merged to specify pipeline
+ execution parameter, and dataflow_default_options is expected to save
+ high-level options, for instances, project and zone information, which
+ apply to all dataflow operators in the DAG.
- For more detail on about job submission have a look at the reference:
+ For more detail on job submission have a look at the reference:
https://cloud.google.com/dataflow/pipelines/specifying-exec-params
@@ -82,11 +87,12 @@ class DataFlowJavaOperator(BaseOperator):
:type dataflow_default_options: dict
:param options: Map of job specific options.
:type options: dict
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+ Platform.
:type gcp_conn_id: string
:param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide
- delegation enabled.
+ For this to work, the service account making the request must have
+ domain-wide delegation enabled.
:type delegate_to: string
"""
super(DataFlowJavaOperator, self).__init__(*args, **kwargs)
@@ -101,9 +107,76 @@ class DataFlowJavaOperator(BaseOperator):
self.options = options
def execute(self, context):
- hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
+ hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
dataflow_options = copy.copy(self.dataflow_default_options)
dataflow_options.update(self.options)
hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
+
+
+class DataFlowPythonOperator(BaseOperator):
+
+ @apply_defaults
+ def __init__(
+ self,
+ py_file,
+ py_options=None,
+ dataflow_default_options=None,
+ options=None,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new DataFlowPythonOperator. Note that both
+ dataflow_default_options and options will be merged to specify pipeline
+ execution parameter, and dataflow_default_options is expected to save
+ high-level options, for instances, project and zone information, which
+ apply to all dataflow operators in the DAG.
+
+ For more detail on job submission have a look at the reference:
+
+ https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+
+ :param py_file: Reference to the python dataflow pipleline file, e.g.,
+ /some/local/file/path/to/your/python/pipeline/file.py.
+ :type py_file: string
+ :param py_options: Additional python options.
+ :type pyt_options: list of strings, e.g., ["-m", "-v"].
+ :param dataflow_default_options: Map of default job options.
+ :type dataflow_default_options: dict
+ :param options: Map of job specific options.
+ :type options: dict
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+ Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have
+ domain-wide delegation enabled.
+ :type delegate_to: string
+ """
+ super(DataFlowPythonOperator, self).__init__(*args, **kwargs)
+
+ self.py_file = py_file
+ self.py_options = py_options or []
+ self.dataflow_default_options = dataflow_default_options or {}
+ self.options = options or {}
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ """Execute the python dataflow job."""
+ hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to)
+ dataflow_options = self.dataflow_default_options.copy()
+ dataflow_options.update(self.options)
+ # Convert argument names from lowerCamelCase to snake case.
+ camel_to_snake = lambda name: re.sub(
+ r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
+ formatted_options = {camel_to_snake(key): dataflow_options[key]
+ for key in dataflow_options}
+ hook.start_python_dataflow(
+ self.task_id, formatted_options,
+ self.py_file, self.py_options)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/tests/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/gcp_dataflow_hook.py b/tests/contrib/hooks/gcp_dataflow_hook.py
new file mode 100644
index 0000000..797d40c
--- /dev/null
+++ b/tests/contrib/hooks/gcp_dataflow_hook.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+
+TASK_ID = 'test-python-dataflow'
+PY_FILE = 'apache_beam.examples.wordcount'
+PY_OPTIONS = ['-m']
+OPTIONS = {
+ 'project': 'test',
+ 'staging_location': 'gs://test/staging'
+}
+BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
+DATAFLOW_STRING = 'airflow.contrib.hooks.gcp_dataflow_hook.{}'
+
+
+def mock_init(self, gcp_conn_id, delegate_to=None):
+ pass
+
+
+class DataFlowHookTest(unittest.TestCase):
+
+ def setUp(self):
+ with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
+ new=mock_init):
+ self.dataflow_hook = DataFlowHook(gcp_conn_id='test')
+
+ @mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_dataflow'))
+ def test_start_python_dataflow(self, internal_dataflow_mock):
+ self.dataflow_hook.start_python_dataflow(
+ task_id=TASK_ID, variables=OPTIONS,
+ dataflow=PY_FILE, py_options=PY_OPTIONS)
+ internal_dataflow_mock.assert_called_once_with(
+ TASK_ID, OPTIONS, PY_FILE, mock.ANY, ['python'] + PY_OPTIONS)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eddecd59/tests/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py
new file mode 100644
index 0000000..4f887c1
--- /dev/null
+++ b/tests/contrib/operators/dataflow_operator.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+
+TASK_ID = 'test-python-dataflow'
+PY_FILE = 'apache_beam.examples.wordcount'
+PY_OPTIONS = ['-m']
+DEFAULT_OPTIONS = {
+ 'project': 'test',
+ 'stagingLocation': 'gs://test/staging'
+}
+ADDITIONAL_OPTIONS = {
+ 'output': 'gs://test/output'
+}
+
+
+class DataFlowPythonOperatorTest(unittest.TestCase):
+
+ def setUp(self):
+ self.dataflow = DataFlowPythonOperator(
+ task_id=TASK_ID,
+ py_file=PY_FILE,
+ py_options=PY_OPTIONS,
+ dataflow_default_options=DEFAULT_OPTIONS,
+ options=ADDITIONAL_OPTIONS)
+
+ def test_init(self):
+ """Test DataFlowPythonOperator instance is properly initialized."""
+ self.assertEqual(self.dataflow.task_id, TASK_ID)
+ self.assertEqual(self.dataflow.py_file, PY_FILE)
+ self.assertEqual(self.dataflow.py_options, PY_OPTIONS)
+ self.assertEqual(self.dataflow.dataflow_default_options,
+ DEFAULT_OPTIONS)
+ self.assertEqual(self.dataflow.options,
+ ADDITIONAL_OPTIONS)
+
+ @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook')
+ def test_exec(self, dataflow_mock):
+ """Test DataFlowHook is created and the right args are passed to
+ start_python_workflow.
+
+ """
+ start_python_hook = dataflow_mock.return_value.start_python_dataflow
+ self.dataflow.execute(None)
+ assert dataflow_mock.called
+ expected_options = {
+ 'project': 'test',
+ 'staging_location': 'gs://test/staging',
+ 'output': 'gs://test/output'
+ }
+ start_python_hook.assert_called_once_with(TASK_ID, expected_options,
+ PY_FILE, PY_OPTIONS)