You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/09 22:03:37 UTC

[beam] branch master updated: Add Snippets for 3 new Beam Patterns 2 X FilePatterns 1 X SideInput Pattern

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 759ea22  Add Snippets for 3 new Beam Patterns 2 X FilePatterns 1 X SideInput Pattern
     new 311edfd  Merge pull request #8538 from rezarokni/Beam-Patterns
759ea22 is described below

commit 759ea2278a4febb156bba63b16e7c8e091e27cb5
Author: rarokni <re...@google.com>
AuthorDate: Thu May 9 08:27:11 2019 +0800

    Add Snippets for 3 new Beam Patterns 2 X FilePatterns 1 X SideInput Pattern
---
 .../apache/beam/examples/snippets/Snippets.java    | 142 +++++++++++++++++++++
 .../apache_beam/examples/snippets/snippets.py      |  17 +++
 2 files changed, 159 insertions(+)

diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 04f36fd..8216bba 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -24,12 +24,18 @@ import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -40,16 +46,31 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
 import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Code snippets used in webdocs. */
 public class Snippets {
@@ -463,4 +484,125 @@ public class Snippets {
     // [END CoGroupByKeyTuple]
     return contactLines;
   }
+
+  public static void fileProcessPattern() throws Exception {
+    Pipeline p = Pipeline.create();
+
+    // [START FileProcessPatternProcessNewFilesSnip1]
+    // This produces PCollection<MatchResult.Metadata>
+    p.apply(
+        FileIO.match()
+            .filepattern("...")
+            .continuously(
+                Duration.standardSeconds(30),
+                Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
+    // [END FileProcessPatternProcessNewFilesSnip1]
+
+    // [START FileProcessPatternProcessNewFilesSnip2]
+    // This produces PCollection<String>
+    p.apply(
+        TextIO.read()
+            .from("<path-to-files>/*")
+            .watchForNewFiles(
+                // Check for new files every minute
+                Duration.standardMinutes(1),
+                // Stop watching the filepattern if no new files appear within an hour
+                Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
+    // [END FileProcessPatternProcessNewFilesSnip2]
+
+    // [START FileProcessPatternAccessMetadataSnip1]
+    p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
+        // withCompression can be omitted - by default compression is detected from the filename.
+        .apply(FileIO.readMatches().withCompression(Compression.GZIP))
+        .apply(
+            ParDo.of(
+                new DoFn<FileIO.ReadableFile, String>() {
+                  @ProcessElement
+                  public void process(@Element FileIO.ReadableFile file) {
+                    // We now have access to the file and its metadata
+                    LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
+                  }
+                }));
+    // [END FileProcessPatternAccessMetadataSnip1]
+
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(Snippets.class);
+
+  // [START SideInputPatternSlowUpdateGlobalWindowSnip1]
+  public static void sideInputPatterns() {
+    // Using View.asSingleton, this pipeline uses a dummy external service as illustration.
+    // Run in debug mode to see the output
+    Pipeline p = Pipeline.create();
+
+    // Create slowly updating sideinput
+
+    PCollectionView<Map<String, String>> map =
+        p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
+            .apply(
+                Window.<Long>into(new GlobalWindows())
+                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
+                    .discardingFiredPanes())
+            .apply(
+                ParDo.of(
+                    new DoFn<Long, Map<String, String>>() {
+
+                      @ProcessElement
+                      public void process(
+                          @Element Long input, OutputReceiver<Map<String, String>> o) {
+                        // Do any external reads needed here...
+                        // We will make use of our dummy external service.
+                        // Every time this triggers, the complete map will be replaced with that
+                        // read from
+                        // the service.
+                        o.output(DummyExternalService.readDummyData());
+                      }
+                    }))
+            .apply(View.asSingleton());
+
+    // ---- Consume slowly updating sideinput
+
+    // GenerateSequence is only used here to generate dummy data for this illustration.
+    // You would use your real source for example PubSubIO, KafkaIO etc...
+    p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
+        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
+        .apply(Sum.longsGlobally().withoutDefaults())
+        .apply(
+            ParDo.of(
+                    new DoFn<Long, KV<Long, Long>>() {
+
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        Map<String, String> keyMap = c.sideInput(map);
+                        c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
+
+                        LOG.debug(
+                            "Value is {} key A is {} and key B is {}",
+                            c.element(),
+                            keyMap.get("Key_A"),
+                            keyMap.get("Key_B"));
+                      }
+                    })
+                .withSideInputs(map));
+  }
+
+  /** Dummy class representing a pretend external service. */
+  public static class DummyExternalService {
+
+    public static Map<String, String> readDummyData() {
+
+      Map<String, String> map = new HashMap<>();
+      Instant now = Instant.now();
+
+      DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
+
+      map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
+      map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
+
+      return map;
+    }
+  }
+
+  // [END SideInputPatternSlowUpdateGlobalWindowSnip1]
+
 }
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 7a04c4c..5e7f9bf 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1335,3 +1335,20 @@ class Count(beam.PTransform):
         | 'PairWithOne' >> beam.Map(lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 # [END model_library_transforms_count]
+
+
+def file_process_pattern_access_metadata():
+
+  import apache_beam as beam
+  from apache_beam.io import fileio
+
+  # [START FileProcessPatternAccessMetadataSnip1]
+  with beam.Pipeline() as p:
+    readable_files = (p
+                      | fileio.MatchFiles('hdfs://path/to/*.txt')
+                      | fileio.ReadMatches()
+                      | beam.Reshuffle())
+    files_and_contents = (readable_files
+                          | beam.Map(lambda x: (x.metadata.path,
+                                                x.read_utf8())))
+  # [END FileProcessPatternAccessMetadataSnip1]