You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2018/01/04 21:44:44 UTC

incubator-airflow git commit: [AIRFLOW-1954] Add DataFlowTemplateOperator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master b3489b99e -> 3c5b73579


[AIRFLOW-1954] Add DataFlowTemplateOperator

Closes #2909 from dsdinter/dataflow_template


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c5b7357
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c5b7357
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c5b7357

Branch: refs/heads/master
Commit: 3c5b73579a3e6c8a1e47c2fddf201b99d690bafe
Parents: b3489b9
Author: David Sabater <da...@gmail.com>
Authored: Thu Jan 4 13:44:07 2018 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Jan 4 13:44:10 2018 -0800

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataflow_hook.py      | 28 ++++++
 airflow/contrib/operators/dataflow_operator.py  | 98 +++++++++++++++++++-
 tests/contrib/hooks/test_gcp_dataflow_hook.py   | 29 +++++-
 .../contrib/operators/test_dataflow_operator.py | 61 +++++++++++-
 4 files changed, 208 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c5b7357/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index 7cb7c79..f9970d9 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -166,6 +166,11 @@ class DataFlowHook(GoogleCloudBaseHook):
         self._start_dataflow(task_id, variables, dataflow, name,
                              ["java", "-jar"], label_formatter)
 
+    def start_template_dataflow(self, task_id, variables, parameters, dataflow_template):
+        name = task_id + "-" + str(uuid.uuid1())[:8]
+        self._start_template_dataflow(
+            name, variables, parameters, dataflow_template)
+
     def start_python_dataflow(self, task_id, variables, dataflow, py_options):
         name = task_id + "-" + str(uuid.uuid1())[:8]
         variables["job_name"] = name
@@ -185,3 +190,26 @@ class DataFlowHook(GoogleCloudBaseHook):
                 else:
                     command.append("--" + attr + "=" + value)
         return command
+
+    def _start_template_dataflow(self, name, variables, parameters, dataflow_template):
+        # Builds RuntimeEnvironment from variables dictionary
+        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
+        environment = {}
+        for key in ['maxWorkers', 'zone', 'serviceAccountEmail', 'tempLocation',
+                    'bypassTempDirValidation', 'machineType']:
+            if key in variables:
+                environment.update({key: variables[key]})
+        body = {"jobName": name,
+                "parameters": parameters,
+                "environment": environment}
+        service = self.get_conn()
+        if variables['project'] is None:
+            raise Exception(
+                'Project not specified')
+        request = service.projects().templates().launch(projectId=variables['project'],
+                                                        gcsPath=dataflow_template,
+                                                        body=body)
+        response = request.execute()
+        _DataflowJob(
+            self.get_conn(), variables['project'], name, self.poll_sleep).wait_for_done()
+        return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c5b7357/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index 01fbd35..915e26c 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -12,9 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import copy
 import re
 import uuid
+import copy
 
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
@@ -131,6 +131,102 @@ class DataFlowJavaOperator(BaseOperator):
         hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
 
 
+class DataflowTemplateOperator(BaseOperator):
+    """
+    Start a Templated Cloud DataFlow batch job. The parameters of the operation
+    will be passed to the job.
+    It's a good practice to define dataflow_* parameters in the default_args of the dag
+    like the project, zone and staging location.
+    https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters
+    https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
+    ```
+    default_args = {
+        'dataflow_default_options': {
+            'project': 'my-gcp-project'
+            'zone': 'europe-west1-d',
+            'tempLocation': 'gs://my-staging-bucket/staging/'
+            }
+        }
+    }
+    ```
+    You need to pass the path to your dataflow template as a file reference with the
+    ``template`` parameter. Use ``parameters`` to pass on parameters to your job.
+    Use ``environment`` to pass on runtime environment variables to your job.
+    ```
+    t1 = DataflowTemplateOperator(
+        task_id='datapflow_example',
+        template='{{var.value.gcp_dataflow_base}}',
+        parameters={
+            'inputFile': "gs://bucket/input/my_input.txt",
+            'outputFile': "gs://bucket/output/my_output.txt"
+        },
+        dag=my-dag)
+    ```
+    ``template`` ``dataflow_default_options`` and ``parameters`` are templated so you can
+    use variables in them.
+    """
+    template_fields = ['parameters', 'dataflow_default_options', 'template']
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            template,
+            dataflow_default_options=None,
+            parameters=None,
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            poll_sleep=10,
+            *args,
+            **kwargs):
+        """
+        Create a new DataflowTemplateOperator. Note that
+        dataflow_default_options is expected to save high-level options
+        for project information, which apply to all dataflow operators in the DAG.
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3
+        /LaunchTemplateParameters
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
+        For more detail on job template execution have a look at the reference:
+        https://cloud.google.com/dataflow/docs/templates/executing-templates
+        :param template: The reference to the DataFlow template.
+        :type template: string
+        :param dataflow_default_options: Map of default job environment options.
+        :type dataflow_default_options: dict
+        :param parameters: Map of job specific parameters for the template.
+        :type parameters: dict
+        :param gcp_conn_id: The connection ID to use connecting to Google Cloud
+        Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have
+            domain-wide delegation enabled.
+        :type delegate_to: string
+        :param poll_sleep: The time in seconds to sleep between polling Google
+            Cloud Platform for the dataflow job status while the job is in the
+            JOB_STATE_RUNNING state.
+        :type poll_sleep: int
+        """
+        super(DataflowTemplateOperator, self).__init__(*args, **kwargs)
+
+        dataflow_default_options = dataflow_default_options or {}
+        parameters = parameters or {}
+
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.dataflow_default_options = dataflow_default_options
+        self.poll_sleep = poll_sleep
+        self.template = template
+        self.parameters = parameters
+
+    def execute(self, context):
+        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id,
+                            delegate_to=self.delegate_to,
+                            poll_sleep=self.poll_sleep)
+
+        hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
+                                     self.parameters, self.template)
+
+
 class DataFlowPythonOperator(BaseOperator):
 
     template_fields = ['options', 'dataflow_default_options']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c5b7357/tests/contrib/hooks/test_gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_gcp_dataflow_hook.py b/tests/contrib/hooks/test_gcp_dataflow_hook.py
index a37b153..bf513c8 100644
--- a/tests/contrib/hooks/test_gcp_dataflow_hook.py
+++ b/tests/contrib/hooks/test_gcp_dataflow_hook.py
@@ -29,7 +29,12 @@ except ImportError:
         mock = None
 
 
-TASK_ID = 'test-python-dataflow'
+TASK_ID = 'test-dataflow-operator'
+TEMPLATE = 'gs://dataflow-templates/wordcount/template_file'
+PARAMETERS = {
+    'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
+    'output': 'gs://test/output/my_output'
+}
 PY_FILE = 'apache_beam.examples.wordcount'
 JAR_FILE = 'unitest.jar'
 PY_OPTIONS = ['-m']
@@ -43,6 +48,11 @@ DATAFLOW_OPTIONS_JAVA = {
     'stagingLocation': 'gs://test/staging',
     'labels': {'foo': 'bar'}
 }
+DATAFLOW_OPTIONS_TEMPLATE = {
+    'project': 'test',
+    'tempLocation': 'gs://test/temp',
+    'zone': 'us-central1-f'
+}
 BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
 DATAFLOW_STRING = 'airflow.contrib.hooks.gcp_dataflow_hook.{}'
 MOCK_UUID = '12345678'
@@ -52,7 +62,7 @@ def mock_init(self, gcp_conn_id, delegate_to=None):
     pass
 
 
-class DataFlowHookTest(unittest.TestCase):
+class DataFlowPythonHookTest(unittest.TestCase):
 
     def setUp(self):
         with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
@@ -129,3 +139,18 @@ class DataFlowHookTest(unittest.TestCase):
       self.assertRaises(Exception, dataflow.wait_for_done)
       mock_logging.warning.assert_has_calls([call('test'), call('error')])
 
+
+class DataFlowTemplateHookTest(unittest.TestCase):
+
+    def setUp(self):
+        with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
+                        new=mock_init):
+            self.dataflow_hook = DataFlowHook(gcp_conn_id='test')
+
+    @mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_template_dataflow'))
+    def test_start_template_dataflow(self, internal_dataflow_mock):
+        self.dataflow_hook.start_template_dataflow(
+            task_id=TASK_ID, variables=DATAFLOW_OPTIONS_TEMPLATE, parameters=PARAMETERS,
+            dataflow_template=TEMPLATE)
+        internal_dataflow_mock.assert_called_once_with(
+            mock.ANY, DATAFLOW_OPTIONS_TEMPLATE, PARAMETERS, TEMPLATE)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c5b7357/tests/contrib/operators/test_dataflow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataflow_operator.py b/tests/contrib/operators/test_dataflow_operator.py
index 5b07051..da95d18 100644
--- a/tests/contrib/operators/test_dataflow_operator.py
+++ b/tests/contrib/operators/test_dataflow_operator.py
@@ -15,6 +15,8 @@
 
 import unittest
 
+from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator, \
+    DataflowTemplateOperator
 from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
 from airflow.version import version
 
@@ -27,12 +29,23 @@ except ImportError:
         mock = None
 
 
-TASK_ID = 'test-python-dataflow'
+TASK_ID = 'test-dataflow-operator'
+TEMPLATE = 'gs://dataflow-templates/wordcount/template_file'
+PARAMETERS = {
+    'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
+    'output': 'gs://test/output/my_output'
+}
 PY_FILE = 'gs://my-bucket/my-object.py'
 PY_OPTIONS = ['-m']
-DEFAULT_OPTIONS = {
+DEFAULT_OPTIONS_PYTHON = {
+    'project': 'test',
+    'stagingLocation': 'gs://test/staging',
+}
+DEFAULT_OPTIONS_TEMPLATE = {
     'project': 'test',
-    'stagingLocation': 'gs://test/staging'
+    'stagingLocation': 'gs://test/staging',
+    'tempLocation': 'gs://test/temp',
+    'zone': 'us-central1-f'
 }
 ADDITIONAL_OPTIONS = {
     'output': 'gs://test/output',
@@ -54,7 +67,7 @@ class DataFlowPythonOperatorTest(unittest.TestCase):
             task_id=TASK_ID,
             py_file=PY_FILE,
             py_options=PY_OPTIONS,
-            dataflow_default_options=DEFAULT_OPTIONS,
+            dataflow_default_options=DEFAULT_OPTIONS_PYTHON,
             options=ADDITIONAL_OPTIONS,
             poll_sleep=POLL_SLEEP)
 
@@ -65,7 +78,7 @@ class DataFlowPythonOperatorTest(unittest.TestCase):
         self.assertEqual(self.dataflow.py_options, PY_OPTIONS)
         self.assertEqual(self.dataflow.poll_sleep, POLL_SLEEP)
         self.assertEqual(self.dataflow.dataflow_default_options,
-                         DEFAULT_OPTIONS)
+                         DEFAULT_OPTIONS_PYTHON)
         self.assertEqual(self.dataflow.options,
                          EXPECTED_ADDITIONAL_OPTIONS)
 
@@ -90,3 +103,41 @@ class DataFlowPythonOperatorTest(unittest.TestCase):
         start_python_hook.assert_called_once_with(TASK_ID, expected_options,
                                                   mock.ANY, PY_OPTIONS)
         self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))
+
+
+class DataFlowTemplateOperatorTest(unittest.TestCase):
+
+    def setUp(self):
+        self.dataflow = DataflowTemplateOperator(
+            task_id=TASK_ID,
+            template=TEMPLATE,
+            parameters=PARAMETERS,
+            dataflow_default_options=DEFAULT_OPTIONS_TEMPLATE,
+            poll_sleep=POLL_SLEEP)
+
+    def test_init(self):
+        """Test DataflowTemplateOperator instance is properly initialized."""
+        self.assertEqual(self.dataflow.task_id, TASK_ID)
+        self.assertEqual(self.dataflow.template, TEMPLATE)
+        self.assertEqual(self.dataflow.parameters, PARAMETERS)
+        self.assertEqual(self.dataflow.poll_sleep, POLL_SLEEP)
+        self.assertEqual(self.dataflow.dataflow_default_options,
+                         DEFAULT_OPTIONS_TEMPLATE)
+
+    @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook')
+    def test_exec(self, dataflow_mock):
+        """Test DataFlowHook is created and the right args are passed to
+        start_template_workflow.
+
+        """
+        start_template_hook = dataflow_mock.return_value.start_template_dataflow
+        self.dataflow.execute(None)
+        self.assertTrue(dataflow_mock.called)
+        expected_options = {
+            'project': 'test',
+            'stagingLocation': 'gs://test/staging',
+            'tempLocation': 'gs://test/temp',
+            'zone': 'us-central1-f'
+        }
+        start_template_hook.assert_called_once_with(TASK_ID, expected_options,
+                                                    PARAMETERS, TEMPLATE)