You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/30 22:57:05 UTC
[beam] branch master updated: [Beam 7447] Add Snippets for Options
pattern. (#8710)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 77cf84c [Beam 7447] Add Snippets for Options pattern. (#8710)
77cf84c is described below
commit 77cf84c634381495d45a112a9d147ad69394c0d4
Author: Reza Rokni <75...@users.noreply.github.com>
AuthorDate: Fri May 31 06:56:52 2019 +0800
[Beam 7447] Add Snippets for Options pattern. (#8710)
* BEAM-7447 Add Snippets for Options pattern
* lint/bug fixes.
---
.../apache/beam/examples/snippets/Snippets.java | 48 +++++++++++++++++++++
.../apache_beam/examples/snippets/snippets.py | 49 ++++++++++++++++++++++
2 files changed, 97 insertions(+)
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 8216bba..4d01feb 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -42,6 +42,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
@@ -605,4 +610,47 @@ public class Snippets {
// [END SideInputPatternSlowUpdateGlobalWindowSnip1]
+ // [START AccessingValueProviderInfoAfterRunSnip1]
+
+ /** Sample of PipelineOptions with a ValueProvider option argument. */
+ public interface MyOptions extends PipelineOptions {
+ @Description("My option")
+ @Default.String("Hello world!")
+ ValueProvider<String> getStringValue();
+
+ void setStringValue(ValueProvider<String> value);
+ }
+
+ public static void accessingValueProviderInfoAfterRunSnip1(String[] args) {
+
+ MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
+
+ // Create pipeline.
+ Pipeline p = Pipeline.create(options);
+
+ // Add a branch for logging the ValueProvider value.
+ p.apply(Create.of(1))
+ .apply(
+ ParDo.of(
+ new DoFn<Integer, Integer>() {
+
+ // Define the DoFn that logs the ValueProvider value.
+ @ProcessElement
+ public void process(ProcessContext c) {
+
+ MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
+ // This example logs the ValueProvider value, but you could store it by
+ // pushing it to an external database.
+
+ LOG.info("Option StringValue was {}", ops.getStringValue());
+ }
+ }));
+
+ // The main pipeline.
+ p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally());
+
+ p.run();
+ }
+
+ // [END AccessingValueProviderInfoAfterRunSnip1]
}
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 5e7f9bf..7111c99 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1352,3 +1352,52 @@ def file_process_pattern_access_metadata():
| beam.Map(lambda x: (x.metadata.path,
x.read_utf8())))
# [END FileProcessPatternAccessMetadataSnip1]
+
+
+def accessing_valueprovider_info_after_run():
+ # [START AccessingValueProviderInfoAfterRunSnip1]
+ import logging
+
+ import apache_beam as beam
+ from apache_beam.options.pipeline_options import PipelineOptions
+ from apache_beam.utils.value_provider import RuntimeValueProvider
+ from apache_beam.io import WriteToText
+
+ class MyOptions(PipelineOptions):
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_value_provider_argument('--string_value', type=str)
+
+ class LogValueProvidersFn(beam.DoFn):
+ def __init__(self, string_vp):
+ self.string_vp = string_vp
+
+ # Define the DoFn that logs the ValueProvider value.
+ # The DoFn is called when creating the pipeline branch.
+ # This example logs the ValueProvider value, but
+ # you could store it by pushing it to an external database.
+ def process(self, an_int):
+ logging.info('The string_value is %s' % self.string_vp.get())
+ # Another option (where you don't need to pass the value at all) is:
+ logging.info('The string value is %s' %
+ RuntimeValueProvider.get_value('string_value', str, ''))
+
+ pipeline_options = PipelineOptions()
+ # Create pipeline.
+ p = beam.Pipeline(options=pipeline_options)
+
+ my_options = pipeline_options.view_as(MyOptions)
+ # Add a branch for logging the ValueProvider value.
+ _ = (p
+ | beam.Create([None])
+ | 'LogValueProvs' >> beam.ParDo(
+ LogValueProvidersFn(my_options.string_value)))
+
+ # The main pipeline.
+ result_pc = (p
+ | "main_pc" >> beam.Create([1, 2, 3])
+ | beam.combiners.Sum.Globally())
+
+ p.run().wait_until_finish()
+
+ # [END AccessingValueProviderInfoAfterRunSnip1]