You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/27 02:17:39 UTC

[1/2] beam git commit: Stage the pipeline in Python DataflowRunner

Repository: beam
Updated Branches:
  refs/heads/master 0c2211375 -> 1f6803078


Stage the pipeline in Python DataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d59c96e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d59c96e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d59c96e

Branch: refs/heads/master
Commit: 7d59c96e8031c39fdf43e022291ae25ac50b39e6
Parents: d5aff5d
Author: Kenneth Knowles <ke...@apache.org>
Authored: Wed Oct 18 13:56:28 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 25 20:17:09 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py         |  5 ++-
 .../runners/dataflow/internal/apiclient.py      | 23 +++++++++++--
 .../runners/dataflow/internal/apiclient_test.py | 36 ++++++++++++++------
 .../runners/dataflow/internal/names.py          |  2 ++
 .../runners/dataflow/template_runner_test.py    |  3 +-
 sdks/python/apache_beam/transforms/core.py      |  3 +-
 6 files changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index b7d89f1..6253c80 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -272,6 +272,9 @@ class DataflowRunner(PipelineRunner):
           'Google Cloud Dataflow runner not available, '
           'please install apache_beam[gcp]')
 
+    # Snapshot the pipeline in a portable proto before mutating it
+    proto_pipeline = pipeline.to_runner_api()
+
     # Performing configured PTransform overrides.
     pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
 
@@ -282,7 +285,7 @@ class DataflowRunner(PipelineRunner):
       plugins = list(set(plugins + setup_options.beam_plugins))
     setup_options.beam_plugins = plugins
 
-    self.job = apiclient.Job(pipeline._options)
+    self.job = apiclient.Job(pipeline._options, proto_pipeline)
 
     # Dataflow runner requires a KV type for GBK inputs, hence we enforce that
     # here.

http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index eec598a..d225503 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -41,6 +41,7 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import WorkerOptions
 from apache_beam.runners.dataflow.internal import dependency
+from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow
 from apache_beam.runners.dataflow.internal.dependency import get_sdk_name_and_version
 from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -118,11 +119,12 @@ class Step(object):
 class Environment(object):
   """Wrapper for a dataflow Environment protobuf."""
 
-  def __init__(self, packages, options, environment_version):
+  def __init__(self, packages, options, environment_version, pipeline_url):
     self.standard_options = options.view_as(StandardOptions)
     self.google_cloud_options = options.view_as(GoogleCloudOptions)
     self.worker_options = options.view_as(WorkerOptions)
     self.debug_options = options.view_as(DebugOptions)
+    self.pipeline_url = pipeline_url
     self.proto = dataflow.Environment()
     self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE
     self.proto.dataset = '{}/cloud_dataflow'.format(
@@ -188,10 +190,16 @@ class Environment(object):
     pool = dataflow.WorkerPool(
         kind='local' if self.local else 'harness',
         packages=package_descriptors,
+        metadata=dataflow.WorkerPool.MetadataValue(),
         taskrunnerSettings=dataflow.TaskRunnerSettings(
             parallelWorkerSettings=dataflow.WorkerSettings(
                 baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
                 servicePath=self.google_cloud_options.dataflow_endpoint)))
+
+    pool.metadata.additionalProperties.append(
+        dataflow.WorkerPool.MetadataValue.AdditionalProperty(
+            key=names.STAGED_PIPELINE_URL_METADATA_FIELD, value=pipeline_url))
+
     pool.autoscalingSettings = dataflow.AutoscalingSettings()
     # Set worker pool options received through command line.
     if self.worker_options.num_workers:
@@ -323,8 +331,9 @@ class Job(object):
       job_name = Job._build_default_job_name(getpass.getuser())
     return job_name
 
-  def __init__(self, options):
+  def __init__(self, options, proto_pipeline):
     self.options = options
+    self.proto_pipeline = proto_pipeline
     self.google_cloud_options = options.view_as(GoogleCloudOptions)
     if not self.google_cloud_options.job_name:
       self.google_cloud_options.job_name = self.default_job_name(
@@ -475,9 +484,19 @@ class DataflowApplicationClient(object):
 
   def create_job_description(self, job):
     """Creates a job described by the workflow proto."""
+
+    # Stage the pipeline for the runner harness
+    self.stage_file(job.google_cloud_options.staging_location,
+                    names.STAGED_PIPELINE_FILENAME,
+                    StringIO(job.proto_pipeline.SerializeToString()))
+
+    # Stage other resources for the SDK harness
     resources = dependency.stage_job_resources(
         job.options, file_copy=self._gcs_file_copy)
+
     job.proto.environment = Environment(
+        pipeline_url=FileSystems.join(job.google_cloud_options.staging_location,
+                                      names.STAGED_PIPELINE_FILENAME),
         packages=resources, options=job.options,
         environment_version=self.environment_version).proto
     logging.debug('JOB: %s', job)

http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 79cbd1c..f8a4471 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -33,6 +33,8 @@ except ImportError:
   apiclient = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
+FAKE_PIPELINE_URL = "gs://invalid-bucket/anywhere"
+
 
 @unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
 class UtilTest(unittest.TestCase):
@@ -48,7 +50,8 @@ class UtilTest(unittest.TestCase):
          '--temp_location', 'gs://any-location/temp'])
     env = apiclient.Environment([], #packages
                                 pipeline_options,
-                                '2.0.0') #any environment version
+                                '2.0.0', #any environment version
+                                FAKE_PIPELINE_URL)
     self.assertEqual(env.proto.workerPools[0].network,
                      'anetworkname')
 
@@ -59,7 +62,8 @@ class UtilTest(unittest.TestCase):
 
     env = apiclient.Environment([], #packages
                                 pipeline_options,
-                                '2.0.0') #any environment version
+                                '2.0.0', #any environment version
+                                FAKE_PIPELINE_URL)
     self.assertEqual(env.proto.workerPools[0].subnetwork,
                      '/regions/MY/subnetworks/SUBNETWORK')
 
@@ -123,14 +127,20 @@ class UtilTest(unittest.TestCase):
   def test_default_ip_configuration(self):
     pipeline_options = PipelineOptions(
         ['--temp_location', 'gs://any-location/temp'])
-    env = apiclient.Environment([], pipeline_options, '2.0.0')
+    env = apiclient.Environment([],
+                                pipeline_options,
+                                '2.0.0',
+                                FAKE_PIPELINE_URL)
     self.assertEqual(env.proto.workerPools[0].ipConfiguration, None)
 
   def test_public_ip_configuration(self):
     pipeline_options = PipelineOptions(
         ['--temp_location', 'gs://any-location/temp',
          '--use_public_ips'])
-    env = apiclient.Environment([], pipeline_options, '2.0.0')
+    env = apiclient.Environment([],
+                                pipeline_options,
+                                '2.0.0',
+                                FAKE_PIPELINE_URL)
     self.assertEqual(
         env.proto.workerPools[0].ipConfiguration,
         dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
@@ -139,7 +149,10 @@ class UtilTest(unittest.TestCase):
     pipeline_options = PipelineOptions(
         ['--temp_location', 'gs://any-location/temp',
          '--no_use_public_ips'])
-    env = apiclient.Environment([], pipeline_options, '2.0.0')
+    env = apiclient.Environment([],
+                                pipeline_options,
+                                '2.0.0',
+                                FAKE_PIPELINE_URL)
     self.assertEqual(
         env.proto.workerPools[0].ipConfiguration,
         dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE)
@@ -158,7 +171,8 @@ class UtilTest(unittest.TestCase):
         mock.MagicMock(return_value=distribution)):
       env = apiclient.Environment([], #packages
                                   pipeline_options,
-                                  '2.0.0') #any environment version
+                                  '2.0.0',
+                                  FAKE_PIPELINE_URL) #any environment version
       self.assertIn(override, env.proto.experiments)
 
   @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
@@ -176,7 +190,8 @@ class UtilTest(unittest.TestCase):
         mock.Mock(side_effect=pkg_resources.DistributionNotFound())):
       env = apiclient.Environment([], #packages
                                   pipeline_options,
-                                  '2.0.0') #any environment version
+                                  '2.0.0',
+                                  FAKE_PIPELINE_URL) #any environment version
       self.assertIn(override, env.proto.experiments)
 
   @mock.patch('apache_beam.runners.dataflow.internal.dependency.'
@@ -190,7 +205,8 @@ class UtilTest(unittest.TestCase):
         mock.Mock(side_effect=pkg_resources.DistributionNotFound())):
       env = apiclient.Environment([], #packages
                                   pipeline_options,
-                                  '2.0.0') #any environment version
+                                  '2.0.0',
+                                  FAKE_PIPELINE_URL) #any environment version
       if env.proto.experiments:
         for experiment in env.proto.experiments:
           self.assertNotIn('runner_harness_container_image=', experiment)
@@ -199,7 +215,7 @@ class UtilTest(unittest.TestCase):
     pipeline_options = PipelineOptions(
         ['--project', 'test_project', '--job_name', 'test_job_name',
          '--temp_location', 'gs://test-location/temp'])
-    job = apiclient.Job(pipeline_options)
+    job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
     self.assertIsNone(job.proto.labels)
 
     pipeline_options = PipelineOptions(
@@ -210,7 +226,7 @@ class UtilTest(unittest.TestCase):
          '--label', 'key3=value3',
          '--labels', 'key4=value4',
          '--labels', 'key5'])
-    job = apiclient.Job(pipeline_options)
+    job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
     self.assertEqual(5, len(job.proto.labels.additionalProperties))
     self.assertEqual('key1', job.proto.labels.additionalProperties[0].key)
     self.assertEqual('value1', job.proto.labels.additionalProperties[0].value)

http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/internal/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index be67224..559b445 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -21,6 +21,8 @@
 # Standard file names used for staging files.
 PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
 DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
+STAGED_PIPELINE_FILENAME = "pipeline.pb"
+STAGED_PIPELINE_URL_METADATA_FIELD = "pipeline_url"
 
 # String constants related to sources framework
 SOURCE_FORMAT = 'custom_source'

http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
index 88afe8a..82eb76b 100644
--- a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
@@ -87,7 +87,8 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
                             '--temp_location=/dev/null',
                             '--template_location=/bad/path',
                             '--no_auth=True']))
-    remote_runner.job = apiclient.Job(pipeline._options)
+    remote_runner.job = apiclient.Job(pipeline._options,
+                                      pipeline.to_runner_api())
 
     with self.assertRaises(IOError):
       pipeline.run().wait_until_finish()

http://git-wip-us.apache.org/repos/asf/beam/blob/7d59c96e/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index fbc5f33..1c05e97 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -847,7 +847,8 @@ class ParDo(PTransformWithSideInputs):
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
   def to_runner_api_parameter(self, context):
-    assert self.__class__ is ParDo
+    assert isinstance(self, ParDo), \
+        "expected instance of ParDo, but got %s" % self.__class__
     picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data())
     return (
         urns.PARDO_TRANSFORM,


[2/2] beam git commit: This closes #4010: [BEAM-3074] Stage the pipeline in Python DataflowRunner

Posted by ke...@apache.org.
This closes #4010: [BEAM-3074] Stage the pipeline in Python DataflowRunner

  Stage the pipeline in Python DataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f680307
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f680307
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f680307

Branch: refs/heads/master
Commit: 1f680307889e03fae3746585c6baf963b2bdf9ba
Parents: 0c22113 7d59c96
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 26 18:43:14 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 26 18:43:14 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py         |  5 ++-
 .../runners/dataflow/internal/apiclient.py      | 23 +++++++++++--
 .../runners/dataflow/internal/apiclient_test.py | 36 ++++++++++++++------
 .../runners/dataflow/internal/names.py          |  2 ++
 .../runners/dataflow/template_runner_test.py    |  3 +-
 sdks/python/apache_beam/transforms/core.py      |  3 +-
 6 files changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------