You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/30 22:57:05 UTC

[beam] branch master updated: [Beam 7447] Add Snippets for Options pattern. (#8710)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 77cf84c  [Beam 7447] Add Snippets for Options pattern. (#8710)
77cf84c is described below

commit 77cf84c634381495d45a112a9d147ad69394c0d4
Author: Reza Rokni <75...@users.noreply.github.com>
AuthorDate: Fri May 31 06:56:52 2019 +0800

    [Beam 7447] Add Snippets for Options pattern. (#8710)
    
    * BEAM-7447 Add Snippets for Options pattern
    * lint/bug fixes.
---
 .../apache/beam/examples/snippets/Snippets.java    | 48 +++++++++++++++++++++
 .../apache_beam/examples/snippets/snippets.py      | 49 ++++++++++++++++++++++
 2 files changed, 97 insertions(+)

diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 8216bba..4d01feb 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -42,6 +42,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
 import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
 import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -605,4 +610,47 @@ public class Snippets {
 
   // [END SideInputPatternSlowUpdateGlobalWindowSnip1]
 
+  // [START AccessingValueProviderInfoAfterRunSnip1]
+
+  /** Sample of PipelineOptions with a ValueProvider option argument. */
+  public interface MyOptions extends PipelineOptions {
+    @Description("My option")
+    @Default.String("Hello world!")
+    ValueProvider<String> getStringValue();
+
+    void setStringValue(ValueProvider<String> value);
+  }
+
+  public static void accessingValueProviderInfoAfterRunSnip1(String[] args) {
+
+    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
+
+    // Create pipeline.
+    Pipeline p = Pipeline.create(options);
+
+    // Add a branch for logging the ValueProvider value.
+    p.apply(Create.of(1))
+        .apply(
+            ParDo.of(
+                new DoFn<Integer, Integer>() {
+
+                  // Define the DoFn that logs the ValueProvider value.
+                  @ProcessElement
+                  public void process(ProcessContext c) {
+
+                    MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
+                    // This example logs the ValueProvider value, but you could store it by
+                    // pushing it to an external database.
+
+                    LOG.info("Option StringValue was {}", ops.getStringValue());
+                  }
+                }));
+
+    // The main pipeline.
+    p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally());
+
+    p.run();
+  }
+
+  // [END AccessingValueProviderInfoAfterRunSnip1]
 }
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 5e7f9bf..7111c99 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1352,3 +1352,52 @@ def file_process_pattern_access_metadata():
                           | beam.Map(lambda x: (x.metadata.path,
                                                 x.read_utf8())))
   # [END FileProcessPatternAccessMetadataSnip1]
+
+
+def accessing_valueprovider_info_after_run():
+  # [START AccessingValueProviderInfoAfterRunSnip1]
+  import logging
+
+  import apache_beam as beam
+  from apache_beam.options.pipeline_options import PipelineOptions
+  from apache_beam.utils.value_provider import RuntimeValueProvider
+  from apache_beam.io import WriteToText
+
+  class MyOptions(PipelineOptions):
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_value_provider_argument('--string_value', type=str)
+
+  class LogValueProvidersFn(beam.DoFn):
+    def __init__(self, string_vp):
+      self.string_vp = string_vp
+
+    # Define the DoFn that logs the ValueProvider value.
+    # The DoFn is called when creating the pipeline branch.
+    # This example logs the ValueProvider value, but
+    # you could store it by pushing it to an external database.
+    def process(self, an_int):
+      logging.info('The string_value is %s' % self.string_vp.get())
+      # Another option (where you don't need to pass the value at all) is:
+      logging.info('The string value is %s' %
+                   RuntimeValueProvider.get_value('string_value', str, ''))
+
+  pipeline_options = PipelineOptions()
+  # Create pipeline.
+  p = beam.Pipeline(options=pipeline_options)
+
+  my_options = pipeline_options.view_as(MyOptions)
+  # Add a branch for logging the ValueProvider value.
+  _ = (p
+       | beam.Create([None])
+       | 'LogValueProvs' >> beam.ParDo(
+           LogValueProvidersFn(my_options.string_value)))
+
+  # The main pipeline.
+  result_pc = (p
+               | "main_pc" >> beam.Create([1, 2, 3])
+               | beam.combiners.Sum.Globally())
+
+  p.run().wait_until_finish()
+
+  # [END AccessingValueProviderInfoAfterRunSnip1]