You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/10/20 17:32:56 UTC
incubator-airflow git commit: [AIRFLOW-1728] Add networkUri, subnet,
tags to Dataproc operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 6372770be -> eb012a3c8
[AIRFLOW-1728] Add networkUri, subnet, tags to Dataproc operator
Closes #2706 from jfantom/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb012a3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb012a3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb012a3c
Branch: refs/heads/master
Commit: eb012a3c8daecdbe20c13958468fbd21ffbbbe3e
Parents: 6372770
Author: jfantom <pa...@gmail.com>
Authored: Fri Oct 20 10:32:08 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Oct 20 10:32:13 2017 -0700
----------------------------------------------------------------------
airflow/contrib/operators/dataproc_operator.py | 20 ++++++++++++++++++++
.../contrib/operators/test_dataproc_operator.py | 15 +++++++++++++++
2 files changed, 35 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb012a3c/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 99e4a0d..ba2c601 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -43,6 +43,9 @@ class DataprocClusterCreateOperator(BaseOperator):
project_id,
num_workers,
zone,
+ network_uri=None,
+ subnetwork_uri=None,
+ tags=None,
storage_bucket=None,
init_actions_uris=None,
metadata=None,
@@ -105,6 +108,14 @@ class DataprocClusterCreateOperator(BaseOperator):
:type labels: dict
:param zone: The zone where the cluster will be located
:type zone: string
+ :param network_uri: The network uri to be used for machine communication, cannot be
+ specified with subnetwork_uri
+ :type network_uri: string
+ :param subnetwork_uri: The subnetwork uri to be used for machine communication, cannot be
+ specified with network_uri
+ :type subnetwork_uri: string
+ :param tags: The GCE tags to add to all instances
+ :type tags: list[string]
:param region: leave as 'global', might become relevant in the future
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:type gcp_conn_id: string
@@ -135,6 +146,9 @@ class DataprocClusterCreateOperator(BaseOperator):
self.worker_disk_size = worker_disk_size
self.labels = labels
self.zone = zone
+ self.network_uri = network_uri
+ self.subnetwork_uri = subnetwork_uri
+ self.tags = tags
self.region = region
self.service_account = service_account
self.service_account_scopes = service_account_scopes
@@ -246,6 +260,12 @@ class DataprocClusterCreateOperator(BaseOperator):
cluster_data['config']['configBucket'] = self.storage_bucket
if self.metadata:
cluster_data['config']['gceClusterConfig']['metadata'] = self.metadata
+ if self.network_uri:
+ cluster_data['config']['gceClusterConfig']['networkUri'] = self.network_uri
+ if self.subnetwork_uri:
+ cluster_data['config']['gceClusterConfig']['subnetworkUri'] = self.subnetwork_uri
+ if self.tags:
+ cluster_data['config']['gceClusterConfig']['tags'] = self.tags
if self.image_version:
cluster_data['config']['softwareConfig']['imageVersion'] = self.image_version
if self.properties:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb012a3c/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index ad78a8d..2df056a 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -44,6 +44,9 @@ CLUSTER_NAME = 'test-cluster-name'
PROJECT_ID = 'test-project-id'
NUM_WORKERS = 123
ZONE = 'us-central1-a'
+NETWORK_URI = '/projects/project_id/regions/global/net'
+SUBNETWORK_URI = '/projects/project_id/regions/global/subnet'
+TAGS = ['tag1', 'tag2']
STORAGE_BUCKET = 'gs://airflow-test-bucket/'
IMAGE_VERSION = '1.1'
MASTER_MACHINE_TYPE = 'n1-standard-2'
@@ -76,6 +79,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
project_id=PROJECT_ID,
num_workers=NUM_WORKERS,
zone=ZONE,
+ network_uri=NETWORK_URI,
+ subnetwork_uri=SUBNETWORK_URI,
+ tags=TAGS,
storage_bucket=STORAGE_BUCKET,
image_version=IMAGE_VERSION,
master_machine_type=MASTER_MACHINE_TYPE,
@@ -103,6 +109,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
self.assertEqual(dataproc_operator.project_id, PROJECT_ID)
self.assertEqual(dataproc_operator.num_workers, NUM_WORKERS)
self.assertEqual(dataproc_operator.zone, ZONE)
+ self.assertEqual(dataproc_operator.network_uri, NETWORK_URI)
+ self.assertEqual(dataproc_operator.subnetwork_uri, SUBNETWORK_URI)
+ self.assertEqual(dataproc_operator.tags, TAGS)
self.assertEqual(dataproc_operator.storage_bucket, STORAGE_BUCKET)
self.assertEqual(dataproc_operator.image_version, IMAGE_VERSION)
self.assertEqual(dataproc_operator.master_machine_type, MASTER_MACHINE_TYPE)
@@ -125,6 +134,12 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
NUM_PREEMPTIBLE_WORKERS)
self.assertEqual(cluster_data['config']['gceClusterConfig']['serviceAccountScopes'],
SERVICE_ACCOUNT_SCOPES)
+ self.assertEqual(cluster_data['config']['gceClusterConfig']['subnetworkUri'],
+ SUBNETWORK_URI)
+ self.assertEqual(cluster_data['config']['gceClusterConfig']['networkUri'],
+ NETWORK_URI)
+ self.assertEqual(cluster_data['config']['gceClusterConfig']['tags'],
+ TAGS)
# test whether the default airflow-version label has been properly
# set to the dataproc operator.
merged_labels = {}