You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2018/01/12 17:00:05 UTC
incubator-airflow git commit: [AIRFLOW-1997] Fix GCP operator doc
strings
Repository: incubator-airflow
Updated Branches:
refs/heads/master a2bb2d70a -> b48bbbd6f
[AIRFLOW-1997] Fix GCP operator doc strings
Closes #2939 from kaxil/docstring_fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b48bbbd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b48bbbd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b48bbbd6
Branch: refs/heads/master
Commit: b48bbbd6f1879f45f50d130ab754a74346fdbf92
Parents: a2bb2d7
Author: Kaxil Naik <ka...@gmail.com>
Authored: Fri Jan 12 08:59:40 2018 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Jan 12 08:59:47 2018 -0800
----------------------------------------------------------------------
airflow/contrib/operators/dataproc_operator.py | 548 ++++++++++----------
airflow/contrib/operators/gcs_to_bq.py | 191 ++++---
2 files changed, 359 insertions(+), 380 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b48bbbd6/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 9c1eb0f..3b10382 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -33,9 +33,66 @@ class DataprocClusterCreateOperator(BaseOperator):
for a detailed explanation on the different parameters. Most of the configuration
parameters detailed in the link are available as a parameter to this operator.
+
+ :param cluster_name: The name of the DataProc cluster to create.
+ :type cluster_name: string
+ :param project_id: The ID of the google cloud project in which
+ to create the cluster
+ :type project_id: string
+ :param num_workers: The # of workers to spin up
+ :type num_workers: int
+ :param storage_bucket: The storage bucket to use, setting to None lets dataproc
+ generate a custom one for you
+ :type storage_bucket: string
+ :param init_actions_uris: List of GCS uri's containing
+ dataproc initialization scripts
+ :type init_actions_uris: list[string]
+ :param metadata: dict of key-value google compute engine metadata entries
+ to add to all instances
+ :type metadata: dict
+ :param image_version: the version of software inside the Dataproc cluster
+ :type image_version: string
+ :param properties: dict of properties to set on
+ config files (e.g. spark-defaults.conf), see
+ https://cloud.google.com/dataproc/docs/reference/rest/v1/ \
+ projects.regions.clusters#SoftwareConfig
+ :type properties: dict
+ :param master_machine_type: Compute engine machine type to use for the master node
+ :type master_machine_type: string
+ :param master_disk_size: Disk size for the master node
+ :type int
+ :param worker_machine_type:Compute engine machine type to use for the worker nodes
+ :type worker_machine_type: string
+ :param worker_disk_size: Disk size for the worker nodes
+ :type worker_disk_size: int
+ :param num_preemptible_workers: The # of preemptible worker nodes to spin up
+ :type num_preemptible_workers: int
+ :param labels: dict of labels to add to the cluster
+ :type labels: dict
+ :param zone: The zone where the cluster will be located
+ :type zone: string
+ :param network_uri: The network uri to be used for machine communication, cannot be
+ specified with subnetwork_uri
+ :type network_uri: string
+ :param subnetwork_uri: The subnetwork uri to be used for machine communication,
+ cannot be specified with network_uri
+ :type subnetwork_uri: string
+ :param tags: The GCE tags to add to all instances
+ :type tags: list[string]
+ :param region: leave as 'global', might become relevant in the future
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ :param service_account: The service account of the dataproc instances.
+ :type service_account: string
+ :param service_account_scopes: The URIs of service account scopes to be included.
+ :type service_account_scopes: list[string]
"""
- template_fields = ['cluster_name',]
+ template_fields = ['cluster_name', ]
@apply_defaults
def __init__(self,
@@ -64,70 +121,7 @@ class DataprocClusterCreateOperator(BaseOperator):
service_account_scopes=None,
*args,
**kwargs):
- """
- Create a new DataprocClusterCreateOperator.
-
- For more info on the creation of a cluster through the API, have a look at:
-
- https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
-
- :param cluster_name: The name of the DataProc cluster to create.
- :type cluster_name: string
- :param project_id: The ID of the google cloud project in which
- to create the cluster
- :type project_id: string
- :param num_workers: The # of workers to spin up
- :type num_workers: int
- :param storage_bucket: The storage bucket to use, setting to None lets dataproc
- generate a custom one for you
- :type storage_bucket: string
- :param init_actions_uris: List of GCS uri's containing
- dataproc initialization scripts
- :type init_actions_uris: list[string]
- :param metadata: dict of key-value google compute engine metadata entries
- to add to all instances
- :type metadata: dict
- :param image_version: the version of software inside the Dataproc cluster
- :type image_version: string
- :param properties: dict of properties to set on
- config files (e.g. spark-defaults.conf), see
- https://cloud.google.com/dataproc/docs/reference/rest/v1/ \
- projects.regions.clusters#SoftwareConfig
- :type properties: dict
- :param master_machine_type: Compute engine machine type to use for the master node
- :type master_machine_type: string
- :param master_disk_size: Disk size for the master node
- :type int
- :param worker_machine_type:Compute engine machine type to use for the worker nodes
- :type worker_machine_type: string
- :param worker_disk_size: Disk size for the worker nodes
- :type worker_disk_size: int
- :param num_preemptible_workers: The # of preemptible worker nodes to spin up
- :type num_preemptible_workers: int
- :param labels: dict of labels to add to the cluster
- :type labels: dict
- :param zone: The zone where the cluster will be located
- :type zone: string
- :param network_uri: The network uri to be used for machine communication, cannot be
- specified with subnetwork_uri
- :type network_uri: string
- :param subnetwork_uri: The subnetwork uri to be used for machine communication, cannot be
- specified with network_uri
- :type subnetwork_uri: string
- :param tags: The GCE tags to add to all instances
- :type tags: list[string]
- :param region: leave as 'global', might become relevant in the future
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide
- delegation enabled.
- :type delegate_to: string
- :param service_account: The service account of the dataproc instances.
- :type service_account: string
- :param service_account_scopes: The URIs of service account scopes to be included.
- :type service_account_scopes: list[string]
- """
+
super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -326,6 +320,20 @@ class DataprocClusterDeleteOperator(BaseOperator):
"""
Delete a cluster on Google Cloud Dataproc. The operator will wait until the
cluster is destroyed.
+
+ :param cluster_name: The name of the cluster to create.
+ :type cluster_name: string
+ :param project_id: The ID of the google cloud project in which
+ the cluster runs
+ :type project_id: string
+ :param region: leave as 'global', might become relevant in the future
+ :type region: string
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
"""
template_fields = ['cluster_name']
@@ -339,23 +347,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
delegate_to=None,
*args,
**kwargs):
- """
- Delete a cluster on Google Cloud Dataproc.
-
- :param cluster_name: The name of the cluster to create.
- :type cluster_name: string
- :param project_id: The ID of the google cloud project in which
- the cluster runs
- :type project_id: string
- :param region: leave as 'global', might become relevant in the future
- :type region: string
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide
- delegation enabled.
- :type delegate_to: string
- """
+
super(DataprocClusterDeleteOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -424,6 +416,37 @@ class DataProcPigOperator(BaseOperator):
variables={'out': 'gs://example/output/{{ds}}'},
dag=dag)
```
+
+ For more detail on about job submission have a look at the reference:
+
+ https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs
+
+ :param query: The query or reference to the query file (pg or pig extension).
+ :type query: string
+ :param query_uri: The uri of a pig script on Cloud Storage.
+ :type query_uri: string
+ :param variables: Map of named parameters for the query.
+ :type variables: dict
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
+ :param cluster_name: The name of the DataProc cluster.
+ :type cluster_name: string
+ :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_pig_properties: dict
+ :param dataproc_pig_jars: URIs to jars provisioned in Cloud Storage (example: for
+ UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_pig_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ :param region: The specified region where the dataproc cluster is created.
+ :type region: string
"""
template_fields = ['query', 'variables', 'job_name', 'cluster_name']
template_ext = ('.pg', '.pig',)
@@ -444,40 +467,7 @@ class DataProcPigOperator(BaseOperator):
region='global',
*args,
**kwargs):
- """
- Create a new DataProcPigOperator.
-
- For more detail on about job submission have a look at the reference:
-
- https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs
-
- :param query: The query or reference to the query file (pg or pig extension).
- :type query: string
- :param query_uri: The uri of a pig script on Cloud Storage.
- :type query_uri: string
- :param variables: Map of named parameters for the query.
- :type variables: dict
- :param job_name: The job name used in the DataProc cluster. This name by default
- is the task_id appended with the execution data, but can be templated. The
- name will always be appended with a random number to avoid name clashes.
- :type job_name: string
- :param cluster_name: The name of the DataProc cluster.
- :type cluster_name: string
- :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in
- default arguments
- :type dataproc_pig_properties: dict
- :param dataproc_pig_jars: URIs to jars provisioned in Cloud Storage (example: for
- UDFs and libs) and are ideal to put in default arguments.
- :type dataproc_pig_jars: list
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide
- delegation enabled.
- :type delegate_to: string
- :param region: The specified region where the dataproc cluster is created.
- :type region: string
- """
+
super(DataProcPigOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -510,6 +500,33 @@ class DataProcPigOperator(BaseOperator):
class DataProcHiveOperator(BaseOperator):
"""
Start a Hive query Job on a Cloud DataProc cluster.
+
+ :param query: The query or reference to the query file (q extension).
+ :type query: string
+ :param query_uri: The uri of a hive script on Cloud Storage.
+ :type query_uri: string
+ :param variables: Map of named parameters for the query.
+ :type variables: dict
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
+ :param cluster_name: The name of the DataProc cluster.
+ :type cluster_name: string
+ :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_hive_properties: dict
+ :param dataproc_hive_jars: URIs to jars provisioned in Cloud Storage (example: for
+ UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_hive_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ :param region: The specified region where the dataproc cluster is created.
+ :type region: string
"""
template_fields = ['query', 'variables', 'job_name', 'cluster_name']
template_ext = ('.q',)
@@ -530,36 +547,7 @@ class DataProcHiveOperator(BaseOperator):
region='global',
*args,
**kwargs):
- """
- Create a new DataProcHiveOperator.
-
- :param query: The query or reference to the query file (q extension).
- :type query: string
- :param query_uri: The uri of a hive script on Cloud Storage.
- :type query_uri: string
- :param variables: Map of named parameters for the query.
- :type variables: dict
- :param job_name: The job name used in the DataProc cluster. This name by default
- is the task_id appended with the execution data, but can be templated. The
- name will always be appended with a random number to avoid name clashes.
- :type job_name: string
- :param cluster_name: The name of the DataProc cluster.
- :type cluster_name: string
- :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in
- default arguments
- :type dataproc_hive_properties: dict
- :param dataproc_hive_jars: URIs to jars provisioned in Cloud Storage (example: for
- UDFs and libs) and are ideal to put in default arguments.
- :type dataproc_hive_jars: list
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide
- delegation enabled.
- :type delegate_to: string
- :param region: The specified region where the dataproc cluster is created.
- :type region: string
- """
+
super(DataProcHiveOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -593,6 +581,33 @@ class DataProcHiveOperator(BaseOperator):
class DataProcSparkSqlOperator(BaseOperator):
"""
Start a Spark SQL query Job on a Cloud DataProc cluster.
+
+ :param query: The query or reference to the query file (q extension).
+ :type query: string
+ :param query_uri: The uri of a spark sql script on Cloud Storage.
+ :type query_uri: string
+ :param variables: Map of named parameters for the query.
+ :type variables: dict
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
+ :param cluster_name: The name of the DataProc cluster.
+ :type cluster_name: string
+ :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_spark_properties: dict
+ :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example:
+ for UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_spark_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ :param region: The specified region where the dataproc cluster is created.
+ :type region: string
"""
template_fields = ['query', 'variables', 'job_name', 'cluster_name']
template_ext = ('.q',)
@@ -613,36 +628,7 @@ class DataProcSparkSqlOperator(BaseOperator):
region='global',
*args,
**kwargs):
- """
- Create a new DataProcSparkSqlOperator.
-
- :param query: The query or reference to the query file (q extension).
- :type query: string
- :param query_uri: The uri of a spark sql script on Cloud Storage.
- :type query_uri: string
- :param variables: Map of named parameters for the query.
- :type variables: dict
- :param job_name: The job name used in the DataProc cluster. This name by default
- is the task_id appended with the execution data, but can be templated. The
- name will always be appended with a random number to avoid name clashes.
- :type job_name: string
- :param cluster_name: The name of the DataProc cluster.
- :type cluster_name: string
- :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
- default arguments
- :type dataproc_spark_properties: dict
- :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example:
- for UDFs and libs) and are ideal to put in default arguments.
- :type dataproc_spark_jars: list
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide
- delegation enabled.
- :type delegate_to: string
- :param region: The specified region where the dataproc cluster is created.
- :type region: string
- """
+
super(DataProcSparkSqlOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -676,6 +662,40 @@ class DataProcSparkSqlOperator(BaseOperator):
class DataProcSparkOperator(BaseOperator):
"""
Start a Spark Job on a Cloud DataProc cluster.
+
+ :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or
+ the main_class, not both together).
+ :type main_jar: string
+ :param main_class: Name of the job class. (use this or the main_jar, not both
+ together).
+ :type main_class: string
+ :param arguments: Arguments for the job.
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
+ directory. Should be stored in Cloud Storage.
+ :type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
+ :param cluster_name: The name of the DataProc cluster.
+ :type cluster_name: string
+ :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_spark_properties: dict
+ :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example:
+ for UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_spark_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ :param region: The specified region where the dataproc cluster is created.
+ :type region: string
"""
template_fields = ['arguments', 'job_name', 'cluster_name']
@@ -698,43 +718,7 @@ class DataProcSparkOperator(BaseOperator):
region='global',
*args,
**kwargs):
- """
- Create a new DataProcSparkOperator.
- :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or
- the main_class, not both together).
- :type main_jar: string
- :param main_class: Name of the job class. (use this or the main_jar, not both
- together).
- :type main_class: string
- :param arguments: Arguments for the job.
- :type arguments: list
- :param archives: List of archived files that will be unpacked in the work
- directory. Should be stored in Cloud Storage.
- :type archives: list
- :param files: List of files to be copied to the working directory
- :type files: list
- :param job_name: The job name used in the DataProc cluster. This name by default
- is the task_id appended with the execution data, but can be templated. The
- name will always be appended with a random number to avoid name clashes.
- :type job_name: string
- :param cluster_name: The name of the DataProc cluster.
- :type cluster_name: string
- :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
- default arguments
- :type dataproc_spark_properties: dict
- :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example:
- for UDFs and libs) and are ideal to put in default arguments.
- :type dataproc_spark_jars: list
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide
- delegation enabled.
- :type delegate_to: string
- :param region: The specified region where the dataproc cluster is created.
- :type region: string
- """
super(DataProcSparkOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -768,6 +752,40 @@ class DataProcSparkOperator(BaseOperator):
class DataProcHadoopOperator(BaseOperator):
"""
Start a Hadoop Job on a Cloud DataProc cluster.
+
+ :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or
+ the main_class, not both together).
+ :type main_jar: string
+ :param main_class: Name of the job class. (use this or the main_jar, not both
+ together).
+ :type main_class: string
+ :param arguments: Arguments for the job.
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
+ directory. Should be stored in Cloud Storage.
+ :type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
+ :param cluster_name: The name of the DataProc cluster.
+ :type cluster_name: string
+ :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_hadoop_properties: dict
+ :param dataproc_hadoop_jars: URIs to jars provisioned in Cloud Storage (example:
+ for UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_hadoop_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ :param region: The specified region where the dataproc cluster is created.
+ :type region: string
"""
template_fields = ['arguments', 'job_name', 'cluster_name']
@@ -790,43 +808,7 @@ class DataProcHadoopOperator(BaseOperator):
region='global',
*args,
**kwargs):
- """
- Create a new DataProcHadoopOperator.
- :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or
- the main_class, not both together).
- :type main_jar: string
- :param main_class: Name of the job class. (use this or the main_jar, not both
- together).
- :type main_class: string
- :param arguments: Arguments for the job.
- :type arguments: list
- :param archives: List of archived files that will be unpacked in the work
- directory. Should be stored in Cloud Storage.
- :type archives: list
- :param files: List of files to be copied to the working directory
- :type files: list
- :param job_name: The job name used in the DataProc cluster. This name by default
- is the task_id appended with the execution data, but can be templated. The
- name will always be appended with a random number to avoid name clashes.
- :type job_name: string
- :param cluster_name: The name of the DataProc cluster.
- :type cluster_name: string
- :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in
- default arguments
- :type dataproc_hadoop_properties: dict
- :param dataproc_hadoop_jars: URIs to jars provisioned in Cloud Storage (example:
- for UDFs and libs) and are ideal to put in default arguments.
- :type dataproc_hadoop_jars: list
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have domain-wide
- delegation enabled.
- :type delegate_to: string
- :param region: The specified region where the dataproc cluster is created.
- :type region: string
- """
super(DataProcHadoopOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -860,6 +842,40 @@ class DataProcHadoopOperator(BaseOperator):
class DataProcPySparkOperator(BaseOperator):
"""
Start a PySpark Job on a Cloud DataProc cluster.
+
+ :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main
+ Python file to use as the driver. Must be a .py file.
+ :type main: string
+ :param arguments: Arguments for the job.
+ :type arguments: list
+ :param archives: List of archived files that will be unpacked in the work
+ directory. Should be stored in Cloud Storage.
+ :type archives: list
+ :param files: List of files to be copied to the working directory
+ :type files: list
+ :param pyfiles: List of Python files to pass to the PySpark framework.
+ Supported file types: .py, .egg, and .zip
+ :type pyfiles: list
+ :param job_name: The job name used in the DataProc cluster. This name by default
+ is the task_id appended with the execution data, but can be templated. The
+ name will always be appended with a random number to avoid name clashes.
+ :type job_name: string
+ :param cluster_name: The name of the DataProc cluster.
+ :type cluster_name: string
+ :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in
+ default arguments
+ :type dataproc_pyspark_properties: dict
+ :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example:
+ for UDFs and libs) and are ideal to put in default arguments.
+ :type dataproc_pyspark_jars: list
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+ :type gcp_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have
+ domain-wide delegation enabled.
+ :type delegate_to: string
+ :param region: The specified region where the dataproc cluster is created.
+ :type region: string
"""
template_fields = ['arguments', 'job_name', 'cluster_name']
@@ -882,43 +898,7 @@ class DataProcPySparkOperator(BaseOperator):
region='global',
*args,
**kwargs):
- """
- Create a new DataProcPySparkOperator.
- :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main
- Python file to use as the driver. Must be a .py file.
- :type main: string
- :param arguments: Arguments for the job.
- :type arguments: list
- :param archives: List of archived files that will be unpacked in the work
- directory. Should be stored in Cloud Storage.
- :type archives: list
- :param files: List of files to be copied to the working directory
- :type files: list
- :param pyfiles: List of Python files to pass to the PySpark framework.
- Supported file types: .py, .egg, and .zip
- :type pyfiles: list
- :param job_name: The job name used in the DataProc cluster. This name by default
- is the task_id appended with the execution data, but can be templated. The
- name will always be appended with a random number to avoid name clashes.
- :type job_name: string
- :param cluster_name: The name of the DataProc cluster.
- :type cluster_name: string
- :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in
- default arguments
- :type dataproc_pyspark_properties: dict
- :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example:
- for UDFs and libs) and are ideal to put in default arguments.
- :type dataproc_pyspark_jars: list
- :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
- :type gcp_conn_id: string
- :param delegate_to: The account to impersonate, if any.
- For this to work, the service account making the request must have
- domain-wide delegation enabled.
- :type delegate_to: string
- :param region: The specified region where the dataproc cluster is created.
- :type region: string
- """
super(DataProcPySparkOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b48bbbd6/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 75302b6..7625bbe 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -23,6 +23,77 @@ from airflow.utils.decorators import apply_defaults
class GoogleCloudStorageToBigQueryOperator(BaseOperator):
"""
Loads files from Google cloud storage into BigQuery.
+
+ The schema to be used for the BigQuery table may be specified in one of
+ two ways. You may either directly pass the schema fields in, or you may
+ point the operator to a Google cloud storage object name. The object in
+ Google cloud storage must be a JSON file with the schema fields in it.
+
+ :param bucket: The bucket to load from.
+ :type bucket: string
+ :param source_objects: List of Google cloud storage URIs to load from.
+ If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI.
+ :type object: list
+ :param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table>
+ BigQuery table to load data into. If <project> is not included, project will
+ be the project defined in the connection json.
+ :type destination_project_dataset_table: string
+ :param schema_fields: If set, the schema field list as defined here:
+ https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
+ Should not be set when source_format is 'DATASTORE_BACKUP'.
+ :type schema_fields: list
+ :param schema_object: If set, a GCS object path pointing to a .json file that
+ contains the schema for the table.
+ :param schema_object: string
+ :param source_format: File format to export.
+ :type source_format: string
+ :param create_disposition: The create disposition if the table doesn't exist.
+ :type create_disposition: string
+ :param skip_leading_rows: Number of rows to skip when loading from a CSV.
+ :type skip_leading_rows: int
+ :param write_disposition: The write disposition if the table already exists.
+ :type write_disposition: string
+ :param field_delimiter: The delimiter to use when loading from a CSV.
+ :type field_delimiter: string
+ :param max_bad_records: The maximum number of bad records that BigQuery can
+ ignore when running the job.
+ :type max_bad_records: int
+ :param quote_character: The value that is used to quote data sections in a CSV file.
+ :type quote_character: string
+ :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false).
+ :type allow_quoted_newlines: boolean
+ :param allow_jagged_rows: Accept rows that are missing trailing optional columns.
+ The missing values are treated as nulls. If false, records with missing trailing
+ columns are treated as bad records, and if there are too many bad records, an
+ invalid error is returned in the job result. Only applicable to CSV, ignored
+ for other formats.
+ :type allow_jagged_rows: bool
+ :param max_id_key: If set, the name of a column in the BigQuery table
+ that's to be loaded. Thsi will be used to select the MAX value from
+ BigQuery after the load occurs. The results will be returned by the
+ execute() command, which in turn gets stored in XCom for future
+ operators to use. This can be helpful with incremental loads--during
+ future executions, you can pick up from the max ID.
+ :type max_id_key: string
+ :param bigquery_conn_id: Reference to a specific BigQuery hook.
+ :type bigquery_conn_id: string
+ :param google_cloud_storage_conn_id: Reference to a specific Google
+ cloud storage hook.
+ :type google_cloud_storage_conn_id: string
+ :param delegate_to: The account to impersonate, if any. For this to
+ work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ :param schema_update_options: Allows the schema of the desitination
+ table to be updated as a side effect of the load job.
+ :type schema_update_options: list
+ :param src_fmt_configs: configure optional fields specific to the source format
+ :type src_fmt_configs: dict
+ :param time_partitioning: configure optional time partitioning fields i.e.
+ partition by field, type and expiration as per API specifications.
+ Note that 'field' is not available in concurrency with
+ dataset.table$partition.
+ :type time_partitioning: dict
"""
template_fields = ('bucket', 'source_objects',
'schema_object', 'destination_project_dataset_table')
@@ -30,102 +101,30 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
ui_color = '#f0eee4'
@apply_defaults
- def __init__(
- self,
- bucket,
- source_objects,
- destination_project_dataset_table,
- schema_fields=None,
- schema_object=None,
- source_format='CSV',
- create_disposition='CREATE_IF_NEEDED',
- skip_leading_rows=0,
- write_disposition='WRITE_EMPTY',
- field_delimiter=',',
- max_bad_records=0,
- quote_character=None,
- allow_quoted_newlines=False,
- allow_jagged_rows=False,
- max_id_key=None,
- bigquery_conn_id='bigquery_default',
- google_cloud_storage_conn_id='google_cloud_storage_default',
- delegate_to=None,
- schema_update_options=(),
- src_fmt_configs={},
- time_partitioning={},
- *args,
- **kwargs):
- """
- The schema to be used for the BigQuery table may be specified in one of
- two ways. You may either directly pass the schema fields in, or you may
- point the operator to a Google cloud storage object name. The object in
- Google cloud storage must be a JSON file with the schema fields in it.
-
- :param bucket: The bucket to load from.
- :type bucket: string
- :param source_objects: List of Google cloud storage URIs to load from.
- If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI.
- :type object: list
- :param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table>
- BigQuery table to load data into. If <project> is not included, project will
- be the project defined in the connection json.
- :type destination_project_dataset_table: string
- :param schema_fields: If set, the schema field list as defined here:
- https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
- Should not be set when source_format is 'DATASTORE_BACKUP'.
- :type schema_fields: list
- :param schema_object: If set, a GCS object path pointing to a .json file that
- contains the schema for the table.
- :param schema_object: string
- :param source_format: File format to export.
- :type source_format: string
- :param create_disposition: The create disposition if the table doesn't exist.
- :type create_disposition: string
- :param skip_leading_rows: Number of rows to skip when loading from a CSV.
- :type skip_leading_rows: int
- :param write_disposition: The write disposition if the table already exists.
- :type write_disposition: string
- :param field_delimiter: The delimiter to use when loading from a CSV.
- :type field_delimiter: string
- :param max_bad_records: The maximum number of bad records that BigQuery can
- ignore when running the job.
- :type max_bad_records: int
- :param quote_character: The value that is used to quote data sections in a CSV file.
- :type quote_character: string
- :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false).
- :type allow_quoted_newlines: boolean
- :param allow_jagged_rows: Accept rows that are missing trailing optional columns.
- The missing values are treated as nulls. If false, records with missing trailing columns
- are treated as bad records, and if there are too many bad records, an invalid error is
- returned in the job result. Only applicable to CSV, ignored for other formats.
- :type allow_jagged_rows: bool
- :param max_id_key: If set, the name of a column in the BigQuery table
- that's to be loaded. Thsi will be used to select the MAX value from
- BigQuery after the load occurs. The results will be returned by the
- execute() command, which in turn gets stored in XCom for future
- operators to use. This can be helpful with incremental loads--during
- future executions, you can pick up from the max ID.
- :type max_id_key: string
- :param bigquery_conn_id: Reference to a specific BigQuery hook.
- :type bigquery_conn_id: string
- :param google_cloud_storage_conn_id: Reference to a specific Google
- cloud storage hook.
- :type google_cloud_storage_conn_id: string
- :param delegate_to: The account to impersonate, if any. For this to
- work, the service account making the request must have domain-wide
- delegation enabled.
- :type delegate_to: string
- :param schema_update_options: Allows the schema of the desitination
- table to be updated as a side effect of the load job.
- :type schema_update_options: list
- :param src_fmt_configs: configure optional fields specific to the source format
- :type src_fmt_configs: dict
- :param time_partitioning: configure optional time partitioning fields i.e.
- partition by field, type and expiration as per API specifications.
- Note that 'field' is not available in concurrency with
- dataset.table$partition.
- :type time_partitioning: dict
- """
+ def __init__(self,
+ bucket,
+ source_objects,
+ destination_project_dataset_table,
+ schema_fields=None,
+ schema_object=None,
+ source_format='CSV',
+ create_disposition='CREATE_IF_NEEDED',
+ skip_leading_rows=0,
+ write_disposition='WRITE_EMPTY',
+ field_delimiter=',',
+ max_bad_records=0,
+ quote_character=None,
+ allow_quoted_newlines=False,
+ allow_jagged_rows=False,
+ max_id_key=None,
+ bigquery_conn_id='bigquery_default',
+ google_cloud_storage_conn_id='google_cloud_storage_default',
+ delegate_to=None,
+ schema_update_options=(),
+ src_fmt_configs={},
+ time_partitioning={},
+ *args, **kwargs):
+
super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)
# GCS config