You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/10/20 17:32:56 UTC

incubator-airflow git commit: [AIRFLOW-1728] Add networkUri, subnet, tags to Dataproc operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6372770be -> eb012a3c8


[AIRFLOW-1728] Add networkUri, subnet, tags to Dataproc operator

Closes #2706 from jfantom/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb012a3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb012a3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb012a3c

Branch: refs/heads/master
Commit: eb012a3c8daecdbe20c13958468fbd21ffbbbe3e
Parents: 6372770
Author: jfantom <pa...@gmail.com>
Authored: Fri Oct 20 10:32:08 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Oct 20 10:32:13 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py  | 20 ++++++++++++++++++++
 .../contrib/operators/test_dataproc_operator.py | 15 +++++++++++++++
 2 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb012a3c/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 99e4a0d..ba2c601 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -43,6 +43,9 @@ class DataprocClusterCreateOperator(BaseOperator):
                  project_id,
                  num_workers,
                  zone,
+                 network_uri=None,
+                 subnetwork_uri=None,
+                 tags=None,
                  storage_bucket=None,
                  init_actions_uris=None,
                  metadata=None,
@@ -105,6 +108,14 @@ class DataprocClusterCreateOperator(BaseOperator):
         :type labels: dict
         :param zone: The zone where the cluster will be located
         :type zone: string
+        :param network_uri: The network uri to be used for machine communication, cannot be
+            specified with subnetwork_uri
+        :type network_uri: string
+        :param subnetwork_uri: The subnetwork uri to be used for machine communication, cannot be
+            specified with network_uri
+        :type subnetwork_uri: string
+        :param tags: The GCE tags to add to all instances
+        :type tags: list[string]
         :param region: leave as 'global', might become relevant in the future
         :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
         :type gcp_conn_id: string
@@ -135,6 +146,9 @@ class DataprocClusterCreateOperator(BaseOperator):
         self.worker_disk_size = worker_disk_size
         self.labels = labels
         self.zone = zone
+        self.network_uri = network_uri
+        self.subnetwork_uri = subnetwork_uri
+        self.tags = tags
         self.region = region
         self.service_account = service_account
         self.service_account_scopes = service_account_scopes
@@ -246,6 +260,12 @@ class DataprocClusterCreateOperator(BaseOperator):
             cluster_data['config']['configBucket'] = self.storage_bucket
         if self.metadata:
             cluster_data['config']['gceClusterConfig']['metadata'] = self.metadata
+        if self.network_uri:
+            cluster_data['config']['gceClusterConfig']['networkUri'] = self.network_uri
+        if self.subnetwork_uri:
+            cluster_data['config']['gceClusterConfig']['subnetworkUri'] = self.subnetwork_uri
+        if self.tags:
+            cluster_data['config']['gceClusterConfig']['tags'] = self.tags
         if self.image_version:
             cluster_data['config']['softwareConfig']['imageVersion'] = self.image_version
         if self.properties:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb012a3c/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index ad78a8d..2df056a 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -44,6 +44,9 @@ CLUSTER_NAME = 'test-cluster-name'
 PROJECT_ID = 'test-project-id'
 NUM_WORKERS = 123
 ZONE = 'us-central1-a'
+NETWORK_URI = '/projects/project_id/regions/global/net'
+SUBNETWORK_URI = '/projects/project_id/regions/global/subnet'
+TAGS = ['tag1', 'tag2']
 STORAGE_BUCKET = 'gs://airflow-test-bucket/'
 IMAGE_VERSION = '1.1'
 MASTER_MACHINE_TYPE = 'n1-standard-2'
@@ -76,6 +79,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                     project_id=PROJECT_ID,
                     num_workers=NUM_WORKERS,
                     zone=ZONE,
+                    network_uri=NETWORK_URI,
+                    subnetwork_uri=SUBNETWORK_URI,
+                    tags=TAGS,
                     storage_bucket=STORAGE_BUCKET,
                     image_version=IMAGE_VERSION,
                     master_machine_type=MASTER_MACHINE_TYPE,
@@ -103,6 +109,9 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
             self.assertEqual(dataproc_operator.project_id, PROJECT_ID)
             self.assertEqual(dataproc_operator.num_workers, NUM_WORKERS)
             self.assertEqual(dataproc_operator.zone, ZONE)
+            self.assertEqual(dataproc_operator.network_uri, NETWORK_URI)
+            self.assertEqual(dataproc_operator.subnetwork_uri, SUBNETWORK_URI)
+            self.assertEqual(dataproc_operator.tags, TAGS)
             self.assertEqual(dataproc_operator.storage_bucket, STORAGE_BUCKET)
             self.assertEqual(dataproc_operator.image_version, IMAGE_VERSION)
             self.assertEqual(dataproc_operator.master_machine_type, MASTER_MACHINE_TYPE)
@@ -125,6 +134,12 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                              NUM_PREEMPTIBLE_WORKERS)
             self.assertEqual(cluster_data['config']['gceClusterConfig']['serviceAccountScopes'],
                 SERVICE_ACCOUNT_SCOPES)
+            self.assertEqual(cluster_data['config']['gceClusterConfig']['subnetworkUri'],
+                SUBNETWORK_URI)
+            self.assertEqual(cluster_data['config']['gceClusterConfig']['networkUri'],
+                NETWORK_URI)
+            self.assertEqual(cluster_data['config']['gceClusterConfig']['tags'],
+                TAGS)
             # test whether the default airflow-version label has been properly
             # set to the dataproc operator.
             merged_labels = {}