You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/07/06 08:31:31 UTC

incubator-airflow git commit: [AIRFLOW-24] DataFlow Java Operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 35d07a8bd -> a9ee8ce98


[AIRFLOW-24] DataFlow Java Operator

Closes #1648 from alexvanboxel/AIRFLOW-24


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9ee8ce9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9ee8ce9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9ee8ce9

Branch: refs/heads/master
Commit: a9ee8ce98bb0af355ba2b50955152afcd79f5134
Parents: 35d07a8
Author: Alex Van Boxel <al...@vanboxel.be>
Authored: Wed Jul 6 10:31:29 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jul 6 10:31:29 2016 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/__init__.py              |   1 +
 airflow/contrib/hooks/gcp_dataflow_hook.py     | 155 ++++++++++++++++++++
 airflow/contrib/operators/dataflow_operator.py | 105 +++++++++++++
 3 files changed, 261 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9ee8ce9/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index 237c831..a16a3f7 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -41,6 +41,7 @@ _hooks = {
     'gcs_hook': ['GoogleCloudStorageHook'],
     'datastore_hook': ['DatastoreHook'],
     'gcp_dataproc_hook': ['DataProcHook'],
+    'gcp_dataflow_hook': ['DataFlowHook'],
     'cloudant_hook': ['CloudantHook'],
     'fs_hook': ['FSHook']
 }

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9ee8ce9/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
new file mode 100644
index 0000000..7c35f3f
--- /dev/null
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -0,0 +1,155 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import select
+import subprocess
+import time
+import uuid
+
+from apiclient.discovery import build
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+
+class _DataflowJob(object):
+    def __init__(self, dataflow, project_number, name):
+        self._dataflow = dataflow
+        self._project_number = project_number
+        self._job_name = name
+        self._job_id = None
+        self._job = self._get_job()
+
+    def _get_job_id_from_name(self):
+        jobs = self._dataflow.projects().jobs().list(
+            projectId=self._project_number
+        ).execute()
+        for job in jobs['jobs']:
+            if job['name'] == self._job_name:
+                self._job_id = job['id']
+                return job
+        return None
+
+    def _get_job(self):
+        if self._job_id is None:
+            job = self._get_job_id_from_name()
+        else:
+            job = self._dataflow.projects().jobs().get(projectId=self._project_number,
+                                                       jobId=self._job_id).execute()
+        if 'currentState' in job:
+            logging.info('Google Cloud DataFlow job %s is %s', job['name'],
+                         job['currentState'])
+        else:
+            logging.info('Google Cloud DataFlow with job_id %s has name %s', self._job_id,
+                         job['name'])
+        return job
+
+    def wait_for_done(self):
+        while True:
+            if 'currentState' in self._job:
+                if 'JOB_STATE_DONE' == self._job['currentState']:
+                    return True
+                elif 'JOB_STATE_FAILED' == self._job['currentState']:
+                    raise Exception("Google Cloud Dataflow job {} has failed.".format(
+                        self._job['name']))
+                elif 'JOB_STATE_CANCELLED' == self._job['currentState']:
+                    raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
+                        self._job['name']))
+                elif 'JOB_STATE_RUNNING' == self._job['currentState']:
+                    time.sleep(10)
+                else:
+                    logging.debug(str(self._job))
+                    raise Exception(
+                        "Google Cloud Dataflow job {} was unknown state: {}".format(
+                            self._job['name'], self._job['currentState']))
+            else:
+                time.sleep(15)
+
+            self._job = self._get_job()
+
+    def get(self):
+        return self._job
+
+
+class _DataflowJava(object):
+    def __init__(self, cmd):
+        self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
+                                      stderr=subprocess.PIPE)
+
+    def _line(self, fd):
+        if fd == self._proc.stderr.fileno():
+            line = self._proc.stderr.readline()
+            return line
+        if fd == self._proc.stdout.fileno():
+            line = self._proc.stdout.readline()
+            return line
+
+    @staticmethod
+    def _extract_job(line):
+        if line is not None:
+            if line.startswith("Submitted job: "):
+                return line[15:-1]
+
+    def wait_for_done(self):
+        reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
+        logging.info("Start waiting for DataFlow process to complete.")
+        while self._proc.poll() is None:
+            ret = select.select(reads, [], [], 5)
+            if ret is not None:
+                for fd in ret[0]:
+                    line = self._line(fd)
+                    logging.debug(line[:-1])
+            else:
+                logging.info("Waiting for DataFlow process to complete.")
+        if self._proc.returncode is not 0:
+            raise Exception("DataFlow jar failed with return code {}".format(
+                self._proc.returncode))
+
+
+class DataFlowHook(GoogleCloudBaseHook):
+    def __init__(self,
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None):
+        """
+        :param scope: The scope of the hook (read only, read write, etc). See:
+            https://cloud.google.com/storage/docs/authentication?hl=en#oauth-scopes
+        :type scope: string
+        """
+        super(DataFlowHook, self).__init__(gcp_conn_id, delegate_to)
+
+    def get_conn(self):
+        """
+        Returns a Google Cloud Storage service object.
+        """
+        http_authorized = self._authorize()
+        return build('dataflow', 'v1b3', http=http_authorized)
+
+    def start_java_dataflow(self, task_id, variables, dataflow):
+        name = task_id + "-" + str(uuid.uuid1())[:8]
+        cmd = self._build_cmd(task_id, variables, dataflow, name)
+        _DataflowJava(cmd).wait_for_done()
+        _DataflowJob(self.get_conn(), "vex-eu-cloud-sql-001", name).wait_for_done()
+
+    def _build_cmd(self, task_id, variables, dataflow, name):
+        command = ["java", "-jar",
+                   dataflow,
+                   "--runner=DataflowPipelineRunner",
+                   "--streaming=false",
+                   "--jobName=" + name]
+
+        if variables is not None:
+            for attr, value in variables.iteritems():
+                command.append("--" + attr + "=" + value)
+
+        return command

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9ee8ce9/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
new file mode 100644
index 0000000..8f61e18
--- /dev/null
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+
+from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataFlowJavaOperator(BaseOperator):
+    """
+    Start a Java Cloud DataFlow batch job. The parameters of the operation
+    will be passed to the job.
+
+    It's a good practice to define dataflow_* parameters in the default_args of the dag
+    like the project, zone and staging location.
+
+    ```
+    default_args = {
+        'dataflow_default_options': {
+            'project': 'my-gcp-project',
+            'zone': 'europe-west1-d',
+            'stagingLocation': 'gs://my-staging-bucket/staging/'
+        }
+    }
+    ```
+
+    You need to pass the path to your dataflow as a file reference with the ``jar``
+    parameter, the jar needs to be a self executing jar. Use ``options`` to pass on
+    options to your job.
+
+    ```
+    t1 = DataFlowOperation(
+        task_id='datapflow_example',
+        jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar',
+        options={
+            'autoscalingAlgorithm': 'BASIC',
+            'maxNumWorkers': '50',
+            'start': '{{ds}}',
+            'partitionType': 'DAY'
+        },
+        dag=my-dag)
+    ```
+
+    Both ``jar`` and ``options`` are templated so you can use variables in them.
+    """
+    template_fields = ['options', 'jar']
+    ui_color = '#0273d4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            jar,
+            dataflow_default_options={},
+            options={},
+            gcp_conn_id='google_cloud_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new DataFlowJavaOperator.
+
+        For more detail on about job submission have a look at the reference:
+
+        https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+
+        :param jar: The reference to a self executing DataFlow jar.
+        :type jar: string
+        :param dataflow_default_options: Map of default job options.
+        :type dataflow_default_options: dict
+        :param options: Map of job specific options.
+        :type options: dict
+        :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+        :type gcp_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(DataFlowJavaOperator, self).__init__(*args, **kwargs)
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.jar = jar
+        self.dataflow_default_options = dataflow_default_options
+        self.options = options
+
+    def execute(self, context):
+        hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
+
+        dataflow_options = copy.copy(self.dataflow_default_options)
+        dataflow_options.update(self.options)
+
+        hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)