You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/10/24 14:47:09 UTC
incubator-airflow git commit: [AIRFLOW-589] Add templatable job_name[]
Repository: incubator-airflow
Updated Branches:
refs/heads/master c5f663387 -> c65f403a5
[AIRFLOW-589] Add templatable job_name[]
The jobname is the name that will appear in the
DataProc web console.
It's helpfull to have a one-to-one mapping between
the airflow task and
the job running on the cluster. Adding a templated
parameter will allow
you to customize how airflow will construct the
jobname.
The default is to add the {{task_id}} +
{{ds_nodash}} + random hash.
Closes #1847 from alexvanboxel/feature/airflow-589
-dataproc-templated-job-name
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c65f403a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c65f403a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c65f403a
Branch: refs/heads/master
Commit: c65f403a532941136aa62fa978f18ba82ce0ae7d
Parents: c5f6633
Author: Alex Van Boxel <al...@vanboxel.be>
Authored: Mon Oct 24 07:46:47 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Mon Oct 24 07:46:53 2016 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/gcp_dataproc_hook.py | 3 +
airflow/contrib/operators/dataproc_operator.py | 106 ++++++++++++++------
2 files changed, 77 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c65f403a/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index fab71c5..e77c951 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -126,6 +126,9 @@ class _DataProcJobBuilder:
def set_python_main(self, main):
self.job["job"][self.job_type]["mainPythonFileUri"] = main
+ def set_job_name(self, name):
+ self.job["job"]["reference"]["jobId"] = name + "_" + str(uuid.uuid1())[:8]
+
def build(self):
return self.job
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c65f403a/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 33e5f79..2955d26 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -43,11 +43,11 @@ class DataProcPigOperator(BaseOperator):
t1 = DataProcPigOperator(
task_id='dataproc_pig',
query='a_pig_script.pig',
- variables={'out': 'gs://example/output/{ds}'},
+ variables={'out': 'gs://example/output/{{ds}}'},
dag=dag)
```
"""
- template_fields = ['query', 'variables']
+ template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster']
template_ext = ('.pg', '.pig',)
ui_color = '#0273d4'
@@ -56,6 +56,7 @@ class DataProcPigOperator(BaseOperator):
self,
query,
variables=None,
+ job_name='{{task.task_id}}_{{ds_nodash}}',
dataproc_cluster='cluster-1',
dataproc_pig_properties=None,
dataproc_pig_jars=None,
@@ -74,6 +75,10 @@ class DataProcPigOperator(BaseOperator):
:type query: string
:param variables: Map of named parameters for the query.
:type variables: dict
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
:param dataproc_cluster: The id of the DataProc cluster.
:type dataproc_cluster: string
:param dataproc_pig_properties: Map for the Pig properties. Ideal to put in
@@ -94,6 +99,7 @@ class DataProcPigOperator(BaseOperator):
self.delegate_to = delegate_to
self.query = query
self.variables = variables
+ self.job_name = job_name
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_pig_properties
self.dataproc_jars = dataproc_pig_jars
@@ -107,6 +113,7 @@ class DataProcPigOperator(BaseOperator):
job.add_query(self.query)
job.add_variables(self.variables)
job.add_jar_file_uris(self.dataproc_jars)
+ job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build())
@@ -115,7 +122,7 @@ class DataProcHiveOperator(BaseOperator):
"""
Start a Hive query Job on a Cloud DataProc cluster.
"""
- template_fields = ['query', 'variables']
+ template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster']
template_ext = ('.q',)
ui_color = '#0273d4'
@@ -124,6 +131,7 @@ class DataProcHiveOperator(BaseOperator):
self,
query,
variables=None,
+ job_name='{{task.task_id}}_{{ds_nodash}}',
dataproc_cluster='cluster-1',
dataproc_hive_properties=None,
dataproc_hive_jars=None,
@@ -138,6 +146,10 @@ class DataProcHiveOperator(BaseOperator):
:type query: string
:param variables: Map of named parameters for the query.
:type variables: dict
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
:param dataproc_cluster: The id of the DataProc cluster.
:type dataproc_cluster: string
:param dataproc_hive_properties: Map for the Pig properties. Ideal to put in
@@ -158,6 +170,7 @@ class DataProcHiveOperator(BaseOperator):
self.delegate_to = delegate_to
self.query = query
self.variables = variables
+ self.job_name = job_name
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_hive_properties
self.dataproc_jars = dataproc_hive_jars
@@ -172,6 +185,7 @@ class DataProcHiveOperator(BaseOperator):
job.add_query(self.query)
job.add_variables(self.variables)
job.add_jar_file_uris(self.dataproc_jars)
+ job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build())
@@ -180,7 +194,7 @@ class DataProcSparkSqlOperator(BaseOperator):
"""
Start a Spark SQL query Job on a Cloud DataProc cluster.
"""
- template_fields = ['query', 'variables']
+ template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster']
template_ext = ('.q',)
ui_color = '#0273d4'
@@ -189,6 +203,7 @@ class DataProcSparkSqlOperator(BaseOperator):
self,
query,
variables=None,
+ job_name='{{task.task_id}}_{{ds_nodash}}',
dataproc_cluster='cluster-1',
dataproc_spark_properties=None,
dataproc_spark_jars=None,
@@ -203,6 +218,10 @@ class DataProcSparkSqlOperator(BaseOperator):
:type query: string
:param variables: Map of named parameters for the query.
:type variables: dict
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
:param dataproc_cluster: The id of the DataProc cluster.
:type dataproc_cluster: string
:param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
@@ -223,6 +242,7 @@ class DataProcSparkSqlOperator(BaseOperator):
self.delegate_to = delegate_to
self.query = query
self.variables = variables
+ self.job_name = job_name
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_spark_properties
self.dataproc_jars = dataproc_spark_jars
@@ -237,6 +257,7 @@ class DataProcSparkSqlOperator(BaseOperator):
job.add_query(self.query)
job.add_variables(self.variables)
job.add_jar_file_uris(self.dataproc_jars)
+ job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build())
@@ -246,7 +267,7 @@ class DataProcSparkOperator(BaseOperator):
Start a Spark Job on a Cloud DataProc cluster.
"""
- template_fields = ['arguments']
+ template_fields = ['arguments', 'job_name', 'dataproc_cluster']
ui_color = '#0273d4'
@apply_defaults
@@ -257,6 +278,7 @@ class DataProcSparkOperator(BaseOperator):
arguments=None,
archives=None,
files=None,
+ job_name='{{task.task_id}}_{{ds_nodash}}',
dataproc_cluster='cluster-1',
dataproc_spark_properties=None,
dataproc_spark_jars=None,
@@ -280,6 +302,10 @@ class DataProcSparkOperator(BaseOperator):
:type archives: list
:param files: List of files to be copied to the working directory
:type files: list
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
:param dataproc_cluster: The id of the DataProc cluster.
:type dataproc_cluster: string
:param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
@@ -303,6 +329,7 @@ class DataProcSparkOperator(BaseOperator):
self.arguments = arguments
self.archives = archives
self.files = files
+ self.job_name = job_name
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_spark_properties
self.dataproc_jars = dataproc_spark_jars
@@ -318,6 +345,7 @@ class DataProcSparkOperator(BaseOperator):
job.add_jar_file_uris(self.dataproc_jars)
job.add_archive_uris(self.archives)
job.add_file_uris(self.files)
+ job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build())
@@ -327,7 +355,7 @@ class DataProcHadoopOperator(BaseOperator):
Start a Hadoop Job on a Cloud DataProc cluster.
"""
- template_fields = ['arguments']
+ template_fields = ['arguments', 'job_name', 'dataproc_cluster']
ui_color = '#0273d4'
@apply_defaults
@@ -338,6 +366,7 @@ class DataProcHadoopOperator(BaseOperator):
arguments=None,
archives=None,
files=None,
+ job_name='{{task.task_id}}_{{ds_nodash}}',
dataproc_cluster='cluster-1',
dataproc_hadoop_properties=None,
dataproc_hadoop_jars=None,
@@ -361,6 +390,10 @@ class DataProcHadoopOperator(BaseOperator):
:type archives: list
:param files: List of files to be copied to the working directory
:type files: list
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
:param dataproc_cluster: The id of the DataProc cluster.
:type dataproc_cluster: string
:param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in
@@ -384,6 +417,7 @@ class DataProcHadoopOperator(BaseOperator):
self.arguments = arguments
self.archives = archives
self.files = files
+ self.job_name = job_name
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_hadoop_properties
self.dataproc_jars = dataproc_hadoop_jars
@@ -399,6 +433,7 @@ class DataProcHadoopOperator(BaseOperator):
job.add_jar_file_uris(self.dataproc_jars)
job.add_archive_uris(self.archives)
job.add_file_uris(self.files)
+ job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build())
@@ -408,7 +443,7 @@ class DataProcPySparkOperator(BaseOperator):
Start a PySpark Job on a Cloud DataProc cluster.
"""
- template_fields = ['arguments']
+ template_fields = ['arguments', 'job_name', 'dataproc_cluster']
ui_color = '#0273d4'
@apply_defaults
@@ -419,6 +454,7 @@ class DataProcPySparkOperator(BaseOperator):
archives=None,
pyfiles=None,
files=None,
+ job_name='{{task.task_id}}_{{ds_nodash}}',
dataproc_cluster='cluster-1',
dataproc_pyspark_properties=None,
dataproc_pyspark_jars=None,
@@ -427,35 +463,39 @@ class DataProcPySparkOperator(BaseOperator):
*args,
**kwargs):
"""
- Create a new DataProcPySparkOperator.
+ Create a new DataProcPySparkOperator.
- :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main
+ :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main
Python file to use as the driver. Must be a .py file.
- :type main: string
- :param arguments: Arguments for the job.
- :type arguments: list
- :param archives: List of archived files that will be unpacked in the work
+ :type main: string
+ :param arguments: Arguments for the job.
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
directory. Should be stored in Cloud Storage.
- :type archives: list
- :param files: List of files to be copied to the working directory
- :type files: list
- :param pyfiles: List of Python files to pass to the PySpark framework.
+ :type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
+ :param pyfiles: List of Python files to pass to the PySpark framework.
Supported file types: .py, .egg, and .zip
- :type pyfiles: list
- :param dataproc_cluster: The id of the DataProc cluster.
- :type dataproc_cluster: string
- :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in
- default arguments
- :type dataproc_pyspark_properties: dict
- :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example:
- for UDFs and libs) and are ideal to put in default arguments.
- :type dataproc_pyspark_jars: list
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
- domain-wide delegation enabled.
- :type delegate_to: string
+ :type pyfiles: list
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
+ :param dataproc_cluster: The id of the DataProc cluster.
+ :type dataproc_cluster: string
+ :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_pyspark_properties: dict
+ :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example:
+ for UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_pyspark_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have
+ domain-wide delegation enabled.
+ :type delegate_to: string
"""
super(DataProcPySparkOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
@@ -465,6 +505,7 @@ class DataProcPySparkOperator(BaseOperator):
self.archives = archives
self.files = files
self.pyfiles = pyfiles
+ self.job_name = job_name
self.dataproc_cluster = dataproc_cluster
self.dataproc_properties = dataproc_pyspark_properties
self.dataproc_jars = dataproc_pyspark_jars
@@ -481,5 +522,6 @@ class DataProcPySparkOperator(BaseOperator):
job.add_archive_uris(self.archives)
job.add_file_uris(self.files)
job.add_python_file_uris(self.pyfiles)
+ job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build())