You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2021/03/19 17:38:43 UTC
[beam] branch master updated: Add an option to create Dataflow
piplines from a snapshot for python sdk (#14278)
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 22a8b18 Add an option to create Dataflow piplines from a snapshot for python sdk (#14278)
22a8b18 is described below
commit 22a8b18633098b2725aaf741023ca3308d7458ab
Author: Andy Xu <50...@users.noreply.github.com>
AuthorDate: Fri Mar 19 10:37:43 2021 -0700
Add an option to create Dataflow piplines from a snapshot for python sdk (#14278)
---
sdks/python/apache_beam/options/pipeline_options.py | 4 ++++
.../apache_beam/runners/dataflow/internal/apiclient.py | 3 +++
.../runners/dataflow/internal/apiclient_test.py | 14 ++++++++++++++
3 files changed, 21 insertions(+)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 9b5b25c..181b536 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -673,6 +673,10 @@ class GoogleCloudOptions(PipelineOptions):
help='Set a Google Cloud KMS key name to be used in '
'Dataflow state operations (GBK, Streaming).')
parser.add_argument(
+ '--create_from_snapshot',
+ default=None,
+ help='The snapshot from which the job should be created.')
+ parser.add_argument(
'--flexrs_goal',
default=None,
choices=['COST_OPTIMIZED', 'SPEED_OPTIMIZED'],
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 137d74d..1b7b4af 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -505,6 +505,9 @@ class Job(object):
self.proto.transformNameMapping.additionalProperties.append(
dataflow.Job.TransformNameMappingValue.AdditionalProperty(
key=key, value=value))
+ if self.google_cloud_options.create_from_snapshot:
+ self.proto.createdFromSnapshotId = (
+ self.google_cloud_options.create_from_snapshot)
# Labels.
if self.google_cloud_options.labels:
self.proto.labels = dataflow.Job.LabelsValue()
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 4b3771e..cf37c84 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -815,6 +815,20 @@ class UtilTest(unittest.TestCase):
job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
self.assertIsNotNone(job.proto.transformNameMapping)
+ def test_created_from_snapshot_id(self):
+ pipeline_options = PipelineOptions([
+ '--project',
+ 'test_project',
+ '--job_name',
+ 'test_job_name',
+ '--temp_location',
+ 'gs://test-location/temp',
+ '--create_from_snapshot',
+ 'test_snapshot_id'
+ ])
+ job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
+ self.assertEqual('test_snapshot_id', job.proto.createdFromSnapshotId)
+
def test_labels(self):
pipeline_options = PipelineOptions([
'--project',