You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2021/03/19 17:38:43 UTC

[beam] branch master updated: Add an option to create Dataflow piplines from a snapshot for python sdk (#14278)

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 22a8b18  Add an option to create Dataflow piplines from a snapshot for python sdk (#14278)
22a8b18 is described below

commit 22a8b18633098b2725aaf741023ca3308d7458ab
Author: Andy Xu <50...@users.noreply.github.com>
AuthorDate: Fri Mar 19 10:37:43 2021 -0700

    Add an option to create Dataflow piplines from a snapshot for python sdk (#14278)
---
 sdks/python/apache_beam/options/pipeline_options.py        |  4 ++++
 .../apache_beam/runners/dataflow/internal/apiclient.py     |  3 +++
 .../runners/dataflow/internal/apiclient_test.py            | 14 ++++++++++++++
 3 files changed, 21 insertions(+)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 9b5b25c..181b536 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -673,6 +673,10 @@ class GoogleCloudOptions(PipelineOptions):
         help='Set a Google Cloud KMS key name to be used in '
         'Dataflow state operations (GBK, Streaming).')
     parser.add_argument(
+        '--create_from_snapshot',
+        default=None,
+        help='The snapshot from which the job should be created.')
+    parser.add_argument(
         '--flexrs_goal',
         default=None,
         choices=['COST_OPTIMIZED', 'SPEED_OPTIMIZED'],
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 137d74d..1b7b4af 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -505,6 +505,9 @@ class Job(object):
           self.proto.transformNameMapping.additionalProperties.append(
               dataflow.Job.TransformNameMappingValue.AdditionalProperty(
                   key=key, value=value))
+    if self.google_cloud_options.create_from_snapshot:
+      self.proto.createdFromSnapshotId = (
+          self.google_cloud_options.create_from_snapshot)
     # Labels.
     if self.google_cloud_options.labels:
       self.proto.labels = dataflow.Job.LabelsValue()
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 4b3771e..cf37c84 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -815,6 +815,20 @@ class UtilTest(unittest.TestCase):
     job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
     self.assertIsNotNone(job.proto.transformNameMapping)
 
+  def test_created_from_snapshot_id(self):
+    pipeline_options = PipelineOptions([
+        '--project',
+        'test_project',
+        '--job_name',
+        'test_job_name',
+        '--temp_location',
+        'gs://test-location/temp',
+        '--create_from_snapshot',
+        'test_snapshot_id'
+    ])
+    job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
+    self.assertEqual('test_snapshot_id', job.proto.createdFromSnapshotId)
+
   def test_labels(self):
     pipeline_options = PipelineOptions([
         '--project',