You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/07/06 08:31:31 UTC
incubator-airflow git commit: [AIRFLOW-24] DataFlow Java Operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 35d07a8bd -> a9ee8ce98
[AIRFLOW-24] DataFlow Java Operator
Closes #1648 from alexvanboxel/AIRFLOW-24
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9ee8ce9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9ee8ce9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9ee8ce9
Branch: refs/heads/master
Commit: a9ee8ce98bb0af355ba2b50955152afcd79f5134
Parents: 35d07a8
Author: Alex Van Boxel <al...@vanboxel.be>
Authored: Wed Jul 6 10:31:29 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jul 6 10:31:29 2016 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/__init__.py | 1 +
airflow/contrib/hooks/gcp_dataflow_hook.py | 155 ++++++++++++++++++++
airflow/contrib/operators/dataflow_operator.py | 105 +++++++++++++
3 files changed, 261 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9ee8ce9/airflow/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py
index 237c831..a16a3f7 100644
--- a/airflow/contrib/hooks/__init__.py
+++ b/airflow/contrib/hooks/__init__.py
@@ -41,6 +41,7 @@ _hooks = {
'gcs_hook': ['GoogleCloudStorageHook'],
'datastore_hook': ['DatastoreHook'],
'gcp_dataproc_hook': ['DataProcHook'],
+ 'gcp_dataflow_hook': ['DataFlowHook'],
'cloudant_hook': ['CloudantHook'],
'fs_hook': ['FSHook']
}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9ee8ce9/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
new file mode 100644
index 0000000..7c35f3f
--- /dev/null
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -0,0 +1,155 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import select
+import subprocess
+import time
+import uuid
+
+from apiclient.discovery import build
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+
+class _DataflowJob(object):
+ def __init__(self, dataflow, project_number, name):
+ self._dataflow = dataflow
+ self._project_number = project_number
+ self._job_name = name
+ self._job_id = None
+ self._job = self._get_job()
+
+ def _get_job_id_from_name(self):
+ jobs = self._dataflow.projects().jobs().list(
+ projectId=self._project_number
+ ).execute()
+ for job in jobs['jobs']:
+ if job['name'] == self._job_name:
+ self._job_id = job['id']
+ return job
+ return None
+
+ def _get_job(self):
+ if self._job_id is None:
+ job = self._get_job_id_from_name()
+ else:
+ job = self._dataflow.projects().jobs().get(projectId=self._project_number,
+ jobId=self._job_id).execute()
+ if 'currentState' in job:
+ logging.info('Google Cloud DataFlow job %s is %s', job['name'],
+ job['currentState'])
+ else:
+ logging.info('Google Cloud DataFlow with job_id %s has name %s', self._job_id,
+ job['name'])
+ return job
+
+ def wait_for_done(self):
+ while True:
+ if 'currentState' in self._job:
+ if 'JOB_STATE_DONE' == self._job['currentState']:
+ return True
+ elif 'JOB_STATE_FAILED' == self._job['currentState']:
+ raise Exception("Google Cloud Dataflow job {} has failed.".format(
+ self._job['name']))
+ elif 'JOB_STATE_CANCELLED' == self._job['currentState']:
+ raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
+ self._job['name']))
+ elif 'JOB_STATE_RUNNING' == self._job['currentState']:
+ time.sleep(10)
+ else:
+ logging.debug(str(self._job))
+ raise Exception(
+ "Google Cloud Dataflow job {} was unknown state: {}".format(
+ self._job['name'], self._job['currentState']))
+ else:
+ time.sleep(15)
+
+ self._job = self._get_job()
+
+ def get(self):
+ return self._job
+
+
+class _DataflowJava(object):
+ def __init__(self, cmd):
+ self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ def _line(self, fd):
+ if fd == self._proc.stderr.fileno():
+ line = self._proc.stderr.readline()
+ return line
+ if fd == self._proc.stdout.fileno():
+ line = self._proc.stdout.readline()
+ return line
+
+ @staticmethod
+ def _extract_job(line):
+ if line is not None:
+ if line.startswith("Submitted job: "):
+ return line[15:-1]
+
+ def wait_for_done(self):
+ reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
+ logging.info("Start waiting for DataFlow process to complete.")
+ while self._proc.poll() is None:
+ ret = select.select(reads, [], [], 5)
+ if ret is not None:
+ for fd in ret[0]:
+ line = self._line(fd)
+ logging.debug(line[:-1])
+ else:
+ logging.info("Waiting for DataFlow process to complete.")
+ if self._proc.returncode is not 0:
+ raise Exception("DataFlow jar failed with return code {}".format(
+ self._proc.returncode))
+
+
+class DataFlowHook(GoogleCloudBaseHook):
+ def __init__(self,
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None):
+ """
+ :param scope: The scope of the hook (read only, read write, etc). See:
+ https://cloud.google.com/storage/docs/authentication?hl=en#oauth-scopes
+ :type scope: string
+ """
+ super(DataFlowHook, self).__init__(gcp_conn_id, delegate_to)
+
+ def get_conn(self):
+ """
+ Returns a Google Cloud Storage service object.
+ """
+ http_authorized = self._authorize()
+ return build('dataflow', 'v1b3', http=http_authorized)
+
+ def start_java_dataflow(self, task_id, variables, dataflow):
+ name = task_id + "-" + str(uuid.uuid1())[:8]
+ cmd = self._build_cmd(task_id, variables, dataflow, name)
+ _DataflowJava(cmd).wait_for_done()
+ _DataflowJob(self.get_conn(), "vex-eu-cloud-sql-001", name).wait_for_done()
+
+ def _build_cmd(self, task_id, variables, dataflow, name):
+ command = ["java", "-jar",
+ dataflow,
+ "--runner=DataflowPipelineRunner",
+ "--streaming=false",
+ "--jobName=" + name]
+
+ if variables is not None:
+ for attr, value in variables.iteritems():
+ command.append("--" + attr + "=" + value)
+
+ return command
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9ee8ce9/airflow/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
new file mode 100644
index 0000000..8f61e18
--- /dev/null
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+
+from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataFlowJavaOperator(BaseOperator):
+ """
+ Start a Java Cloud DataFlow batch job. The parameters of the operation
+ will be passed to the job.
+
+ It's a good practice to define dataflow_* parameters in the default_args of the dag
+ like the project, zone and staging location.
+
+ ```
+ default_args = {
+ 'dataflow_default_options': {
+ 'project': 'my-gcp-project',
+ 'zone': 'europe-west1-d',
+ 'stagingLocation': 'gs://my-staging-bucket/staging/'
+ }
+ }
+ ```
+
+ You need to pass the path to your dataflow as a file reference with the ``jar``
+ parameter, the jar needs to be a self executing jar. Use ``options`` to pass on
+ options to your job.
+
+ ```
+ t1 = DataFlowOperation(
+ task_id='datapflow_example',
+ jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar',
+ options={
+ 'autoscalingAlgorithm': 'BASIC',
+ 'maxNumWorkers': '50',
+ 'start': '{{ds}}',
+ 'partitionType': 'DAY'
+ },
+ dag=my-dag)
+ ```
+
+ Both ``jar`` and ``options`` are templated so you can use variables in them.
+ """
+ template_fields = ['options', 'jar']
+ ui_color = '#0273d4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ jar,
+ dataflow_default_options={},
+ options={},
+ gcp_conn_id='google_cloud_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new DataFlowJavaOperator.
+
+ For more detail on about job submission have a look at the reference:
+
+ https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+
+ :param jar: The reference to a self executing DataFlow jar.
+ :type jar: string
+ :param dataflow_default_options: Map of default job options.
+ :type dataflow_default_options: dict
+ :param options: Map of job specific options.
+ :type options: dict
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+ super(DataFlowJavaOperator, self).__init__(*args, **kwargs)
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.jar = jar
+ self.dataflow_default_options = dataflow_default_options
+ self.options = options
+
+ def execute(self, context):
+ hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
+
+ dataflow_options = copy.copy(self.dataflow_default_options)
+ dataflow_options.update(self.options)
+
+ hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)