You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:35:16 UTC
[33/50] [abbrv] beam git commit: Adding support for subnetwork in
Python Pipelineoptions
Adding support for subnetwork in Python Pipelineoptions
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/902f27a3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/902f27a3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/902f27a3
Branch: refs/heads/jstorm-runner
Commit: 902f27a34cf65923823d10c0edf11ba27e883c30
Parents: aad8555
Author: Pablo <pa...@google.com>
Authored: Mon May 15 15:47:18 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon May 15 21:32:46 2017 -0700
----------------------------------------------------------------------
.../apache_beam/options/pipeline_options.py | 11 +++++++++-
.../runners/dataflow/internal/apiclient.py | 2 ++
.../runners/dataflow/internal/apiclient_test.py | 21 ++++++++++++++++++++
3 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/902f27a3/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 983d128..777926a 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -169,7 +169,7 @@ class PipelineOptions(HasDisplayData):
"""Returns a PipelineOptions from a dictionary of arguments.
Args:
- options: Dictinary of argument value pairs.
+ options: Dictionary of argument value pairs.
Returns:
A PipelineOptions object representing the given arguments.
@@ -455,6 +455,15 @@ class WorkerOptions(PipelineOptions):
'GCE network for launching workers. Default is up to the Dataflow '
'service.'))
parser.add_argument(
+ '--subnetwork',
+ default=None,
+ help=(
+ 'GCE subnetwork for launching workers. Default is up to the '
+ 'Dataflow service. Expected format is '
+ 'regions/REGION/subnetworks/SUBNETWORK or the fully qualified '
+ 'subnetwork name. For more information, see '
+ 'https://cloud.google.com/compute/docs/vpc/'))
+ parser.add_argument(
'--worker_harness_container_image',
default=None,
help=('Docker registry location of container image to use for the '
http://git-wip-us.apache.org/repos/asf/beam/blob/902f27a3/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index ea49593..bfdd5e4 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -199,6 +199,8 @@ class Environment(object):
pool.zone = self.worker_options.zone
if self.worker_options.network:
pool.network = self.worker_options.network
+ if self.worker_options.subnetwork:
+ pool.subnetwork = self.worker_options.subnetwork
if self.worker_options.worker_harness_container_image:
pool.workerHarnessContainerImage = (
self.worker_options.worker_harness_container_image)
http://git-wip-us.apache.org/repos/asf/beam/blob/902f27a3/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 6ed1fb4..67cf77f 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -44,6 +44,27 @@ class UtilTest(unittest.TestCase):
pipeline_options,
DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
+ def test_set_network(self):
+ pipeline_options = PipelineOptions(
+ ['--network', 'anetworkname',
+ '--temp_location', 'gs://any-location/temp'])
+ env = apiclient.Environment([], #packages
+ pipeline_options,
+ '2.0.0') #any environment version
+ self.assertEqual(env.proto.workerPools[0].network,
+ 'anetworkname')
+
+ def test_set_subnetwork(self):
+ pipeline_options = PipelineOptions(
+ ['--subnetwork', '/regions/MY/subnetworks/SUBNETWORK',
+ '--temp_location', 'gs://any-location/temp'])
+
+ env = apiclient.Environment([], #packages
+ pipeline_options,
+ '2.0.0') #any environment version
+ self.assertEqual(env.proto.workerPools[0].subnetwork,
+ '/regions/MY/subnetworks/SUBNETWORK')
+
def test_invalid_default_job_name(self):
# Regexp for job names in dataflow.
regexp = '^[a-z]([-a-z0-9]{0,61}[a-z0-9])?$'