You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/27 02:17:39 UTC
[1/2] beam git commit: Stage the pipeline in Python DataflowRunner
Repository: beam
Updated Branches:
refs/heads/master 0c2211375 -> 1f6803078
Stage the pipeline in Python DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d59c96e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d59c96e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d59c96e
Branch: refs/heads/master
Commit: 7d59c96e8031c39fdf43e022291ae25ac50b39e6
Parents: d5aff5d
Author: Kenneth Knowles <ke...@apache.org>
Authored: Wed Oct 18 13:56:28 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 25 20:17:09 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/dataflow_runner.py | 5 ++-
.../runners/dataflow/internal/apiclient.py | 23 +++++++++++--
.../runners/dataflow/internal/apiclient_test.py | 36 ++++++++++++++------
.../runners/dataflow/internal/names.py | 2 ++
.../runners/dataflow/template_runner_test.py | 3 +-
sdks/python/apache_beam/transforms/core.py | 3 +-
6 files changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index b7d89f1..6253c80 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -272,6 +272,9 @@ class DataflowRunner(PipelineRunner):
'Google Cloud Dataflow runner not available, '
'please install apache_beam[gcp]')
+ # Snapshot the pipeline in a portable proto before mutating it
+ proto_pipeline = pipeline.to_runner_api()
+
# Performing configured PTransform overrides.
pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
@@ -282,7 +285,7 @@ class DataflowRunner(PipelineRunner):
plugins = list(set(plugins + setup_options.beam_plugins))
setup_options.beam_plugins = plugins
- self.job = apiclient.Job(pipeline._options)
+ self.job = apiclient.Job(pipeline._options, proto_pipeline)
# Dataflow runner requires a KV type for GBK inputs, hence we enforce that
# here.
http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index eec598a..d225503 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -41,6 +41,7 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.runners.dataflow.internal import dependency
+from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow
from apache_beam.runners.dataflow.internal.dependency import get_sdk_name_and_version
from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -118,11 +119,12 @@ class Step(object):
class Environment(object):
"""Wrapper for a dataflow Environment protobuf."""
- def __init__(self, packages, options, environment_version):
+ def __init__(self, packages, options, environment_version, pipeline_url):
self.standard_options = options.view_as(StandardOptions)
self.google_cloud_options = options.view_as(GoogleCloudOptions)
self.worker_options = options.view_as(WorkerOptions)
self.debug_options = options.view_as(DebugOptions)
+ self.pipeline_url = pipeline_url
self.proto = dataflow.Environment()
self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE
self.proto.dataset = '{}/cloud_dataflow'.format(
@@ -188,10 +190,16 @@ class Environment(object):
pool = dataflow.WorkerPool(
kind='local' if self.local else 'harness',
packages=package_descriptors,
+ metadata=dataflow.WorkerPool.MetadataValue(),
taskrunnerSettings=dataflow.TaskRunnerSettings(
parallelWorkerSettings=dataflow.WorkerSettings(
baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
servicePath=self.google_cloud_options.dataflow_endpoint)))
+
+ pool.metadata.additionalProperties.append(
+ dataflow.WorkerPool.MetadataValue.AdditionalProperty(
+ key=names.STAGED_PIPELINE_URL_METADATA_FIELD, value=pipeline_url))
+
pool.autoscalingSettings = dataflow.AutoscalingSettings()
# Set worker pool options received through command line.
if self.worker_options.num_workers:
@@ -323,8 +331,9 @@ class Job(object):
job_name = Job._build_default_job_name(getpass.getuser())
return job_name
- def __init__(self, options):
+ def __init__(self, options, proto_pipeline):
self.options = options
+ self.proto_pipeline = proto_pipeline
self.google_cloud_options = options.view_as(GoogleCloudOptions)
if not self.google_cloud_options.job_name:
self.google_cloud_options.job_name = self.default_job_name(
@@ -475,9 +484,19 @@ class DataflowApplicationClient(object):
def create_job_description(self, job):
"""Creates a job described by the workflow proto."""
+
+ # Stage the pipeline for the runner harness
+ self.stage_file(job.google_cloud_options.staging_location,
+ names.STAGED_PIPELINE_FILENAME,
+ StringIO(job.proto_pipeline.SerializeToString()))
+
+ # Stage other resources for the SDK harness
resources = dependency.stage_job_resources(
job.options, file_copy=self._gcs_file_copy)
+
job.proto.environment = Environment(
+ pipeline_url=FileSystems.join(job.google_cloud_options.staging_location,
+ names.STAGED_PIPELINE_FILENAME),
packages=resources, options=job.options,
environment_version=self.environment_version).proto
logging.debug('JOB: %s', job)
http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 79cbd1c..f8a4471 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -33,6 +33,8 @@ except ImportError:
apiclient = None
# pylint: enable=wrong-import-order, wrong-import-position
+FAKE_PIPELINE_URL = "gs://invalid-bucket/anywhere"
+
@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
class UtilTest(unittest.TestCase):
@@ -48,7 +50,8 @@ class UtilTest(unittest.TestCase):
'--temp_location', 'gs://any-location/temp'])
env = apiclient.Environment([], #packages
pipeline_options,
- '2.0.0') #any environment version
+ '2.0.0', #any environment version
+ FAKE_PIPELINE_URL)
self.assertEqual(env.proto.workerPools[0].network,
'anetworkname')
@@ -59,7 +62,8 @@ class UtilTest(unittest.TestCase):
env = apiclient.Environment([], #packages
pipeline_options,
- '2.0.0') #any environment version
+ '2.0.0', #any environment version
+ FAKE_PIPELINE_URL)
self.assertEqual(env.proto.workerPools[0].subnetwork,
'/regions/MY/subnetworks/SUBNETWORK')
@@ -123,14 +127,20 @@ class UtilTest(unittest.TestCase):
def test_default_ip_configuration(self):
pipeline_options = PipelineOptions(
['--temp_location', 'gs://any-location/temp'])
- env = apiclient.Environment([], pipeline_options, '2.0.0')
+ env = apiclient.Environment([],
+ pipeline_options,
+ '2.0.0',
+ FAKE_PIPELINE_URL)
self.assertEqual(env.proto.workerPools[0].ipConfiguration, None)
def test_public_ip_configuration(self):
pipeline_options = PipelineOptions(
['--temp_location', 'gs://any-location/temp',
'--use_public_ips'])
- env = apiclient.Environment([], pipeline_options, '2.0.0')
+ env = apiclient.Environment([],
+ pipeline_options,
+ '2.0.0',
+ FAKE_PIPELINE_URL)
self.assertEqual(
env.proto.workerPools[0].ipConfiguration,
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
@@ -139,7 +149,10 @@ class UtilTest(unittest.TestCase):
pipeline_options = PipelineOptions(
['--temp_location', 'gs://any-location/temp',
'--no_use_public_ips'])
- env = apiclient.Environment([], pipeline_options, '2.0.0')
+ env = apiclient.Environment([],
+ pipeline_options,
+ '2.0.0',
+ FAKE_PIPELINE_URL)
self.assertEqual(
env.proto.workerPools[0].ipConfiguration,
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE)
@@ -158,7 +171,8 @@ class UtilTest(unittest.TestCase):
mock.MagicMock(return_value=distribution)):
env = apiclient.Environment([], #packages
pipeline_options,
- '2.0.0') #any environment version
+ '2.0.0',
+ FAKE_PIPELINE_URL) #any environment version
self.assertIn(override, env.proto.experiments)
@mock.patch('apache_beam.runners.dataflow.internal.dependency.'
@@ -176,7 +190,8 @@ class UtilTest(unittest.TestCase):
mock.Mock(side_effect=pkg_resources.DistributionNotFound())):
env = apiclient.Environment([], #packages
pipeline_options,
- '2.0.0') #any environment version
+ '2.0.0',
+ FAKE_PIPELINE_URL) #any environment version
self.assertIn(override, env.proto.experiments)
@mock.patch('apache_beam.runners.dataflow.internal.dependency.'
@@ -190,7 +205,8 @@ class UtilTest(unittest.TestCase):
mock.Mock(side_effect=pkg_resources.DistributionNotFound())):
env = apiclient.Environment([], #packages
pipeline_options,
- '2.0.0') #any environment version
+ '2.0.0',
+ FAKE_PIPELINE_URL) #any environment version
if env.proto.experiments:
for experiment in env.proto.experiments:
self.assertNotIn('runner_harness_container_image=', experiment)
@@ -199,7 +215,7 @@ class UtilTest(unittest.TestCase):
pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
'--temp_location', 'gs://test-location/temp'])
- job = apiclient.Job(pipeline_options)
+ job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
self.assertIsNone(job.proto.labels)
pipeline_options = PipelineOptions(
@@ -210,7 +226,7 @@ class UtilTest(unittest.TestCase):
'--label', 'key3=value3',
'--labels', 'key4=value4',
'--labels', 'key5'])
- job = apiclient.Job(pipeline_options)
+ job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
self.assertEqual(5, len(job.proto.labels.additionalProperties))
self.assertEqual('key1', job.proto.labels.additionalProperties[0].key)
self.assertEqual('value1', job.proto.labels.additionalProperties[0].value)
http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/internal/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index be67224..559b445 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -21,6 +21,8 @@
# Standard file names used for staging files.
PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
+STAGED_PIPELINE_FILENAME = "pipeline.pb"
+STAGED_PIPELINE_URL_METADATA_FIELD = "pipeline_url"
# String constants related to sources framework
SOURCE_FORMAT = 'custom_source'
http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
index 88afe8a..82eb76b 100644
--- a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
@@ -87,7 +87,8 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
'--temp_location=/dev/null',
'--template_location=/bad/path',
'--no_auth=True']))
- remote_runner.job = apiclient.Job(pipeline._options)
+ remote_runner.job = apiclient.Job(pipeline._options,
+ pipeline.to_runner_api())
with self.assertRaises(IOError):
pipeline.run().wait_until_finish()
http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index fbc5f33..1c05e97 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -847,7 +847,8 @@ class ParDo(PTransformWithSideInputs):
return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
def to_runner_api_parameter(self, context):
- assert self.__class__ is ParDo
+ assert isinstance(self, ParDo), \
+ "expected instance of ParDo, but got %s" % self.__class__
picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data())
return (
urns.PARDO_TRANSFORM,
[2/2] beam git commit: This closes #4010: [BEAM-3074] Stage the
pipeline in Python DataflowRunner
Posted by ke...@apache.org.
This closes #4010: [BEAM-3074] Stage the pipeline in Python DataflowRunner
Stage the pipeline in Python DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f680307
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f680307
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f680307
Branch: refs/heads/master
Commit: 1f680307889e03fae3746585c6baf963b2bdf9ba
Parents: 0c22113 7d59c96
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 26 18:43:14 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 26 18:43:14 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/dataflow_runner.py | 5 ++-
.../runners/dataflow/internal/apiclient.py | 23 +++++++++++--
.../runners/dataflow/internal/apiclient_test.py | 36 ++++++++++++++------
.../runners/dataflow/internal/names.py | 2 ++
.../runners/dataflow/template_runner_test.py | 3 +-
sdks/python/apache_beam/transforms/core.py | 3 +-
6 files changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------