You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/09 22:03:37 UTC
[beam] branch master updated: Add Snippets for 3 new Beam Patterns
2 X FilePatterns 1 X SideInput Pattern
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 759ea22 Add Snippets for 3 new Beam Patterns 2 X FilePatterns 1 X SideInput Pattern
new 311edfd Merge pull request #8538 from rezarokni/Beam-Patterns
759ea22 is described below
commit 759ea2278a4febb156bba63b16e7c8e091e27cb5
Author: rarokni <re...@google.com>
AuthorDate: Thu May 9 08:27:11 2019 +0800
Add Snippets for 3 new Beam Patterns 2 X FilePatterns 1 X SideInput Pattern
---
.../apache/beam/examples/snippets/Snippets.java | 142 +++++++++++++++++++++
.../apache_beam/examples/snippets/snippets.py | 17 +++
2 files changed, 159 insertions(+)
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 04f36fd..8216bba 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -24,12 +24,18 @@ import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -40,16 +46,31 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Code snippets used in webdocs. */
public class Snippets {
@@ -463,4 +484,125 @@ public class Snippets {
// [END CoGroupByKeyTuple]
return contactLines;
}
+
+ public static void fileProcessPattern() throws Exception {
+ Pipeline p = Pipeline.create();
+
+ // [START FileProcessPatternProcessNewFilesSnip1]
+ // This produces PCollection<MatchResult.Metadata>
+ p.apply(
+ FileIO.match()
+ .filepattern("...")
+ .continuously(
+ Duration.standardSeconds(30),
+ Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
+ // [END FileProcessPatternProcessNewFilesSnip1]
+
+ // [START FileProcessPatternProcessNewFilesSnip2]
+ // This produces PCollection<String>
+ p.apply(
+ TextIO.read()
+ .from("<path-to-files>/*")
+ .watchForNewFiles(
+ // Check for new files every minute
+ Duration.standardMinutes(1),
+ // Stop watching the filepattern if no new files appear within an hour
+ Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
+ // [END FileProcessPatternProcessNewFilesSnip2]
+
+ // [START FileProcessPatternAccessMetadataSnip1]
+ p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
+ // withCompression can be omitted - by default compression is detected from the filename.
+ .apply(FileIO.readMatches().withCompression(Compression.GZIP))
+ .apply(
+ ParDo.of(
+ new DoFn<FileIO.ReadableFile, String>() {
+ @ProcessElement
+ public void process(@Element FileIO.ReadableFile file) {
+ // We now have access to the file and its metadata
+ LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
+ }
+ }));
+ // [END FileProcessPatternAccessMetadataSnip1]
+
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(Snippets.class);
+
+ // [START SideInputPatternSlowUpdateGlobalWindowSnip1]
+ public static void sideInputPatterns() {
+ // Using View.asSingleton, this pipeline uses a dummy external service as illustration.
+ // Run in debug mode to see the output
+ Pipeline p = Pipeline.create();
+
+ // Create slowly updating sideinput
+
+ PCollectionView<Map<String, String>> map =
+ p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
+ .apply(
+ Window.<Long>into(new GlobalWindows())
+ .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
+ .discardingFiredPanes())
+ .apply(
+ ParDo.of(
+ new DoFn<Long, Map<String, String>>() {
+
+ @ProcessElement
+ public void process(
+ @Element Long input, OutputReceiver<Map<String, String>> o) {
+ // Do any external reads needed here...
+ // We will make use of our dummy external service.
+ // Every time this triggers, the complete map will be replaced with that
+ // read from
+ // the service.
+ o.output(DummyExternalService.readDummyData());
+ }
+ }))
+ .apply(View.asSingleton());
+
+ // ---- Consume slowly updating sideinput
+
+ // GenerateSequence is only used here to generate dummy data for this illustration.
+ // You would use your real source for example PubSubIO, KafkaIO etc...
+ p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
+ .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
+ .apply(Sum.longsGlobally().withoutDefaults())
+ .apply(
+ ParDo.of(
+ new DoFn<Long, KV<Long, Long>>() {
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ Map<String, String> keyMap = c.sideInput(map);
+ c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
+
+ LOG.debug(
+ "Value is {} key A is {} and key B is {}",
+ c.element(),
+ keyMap.get("Key_A"),
+ keyMap.get("Key_B"));
+ }
+ })
+ .withSideInputs(map));
+ }
+
+ /** Dummy class representing a pretend external service. */
+ public static class DummyExternalService {
+
+ public static Map<String, String> readDummyData() {
+
+ Map<String, String> map = new HashMap<>();
+ Instant now = Instant.now();
+
+ DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
+
+ map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
+ map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
+
+ return map;
+ }
+ }
+
+ // [END SideInputPatternSlowUpdateGlobalWindowSnip1]
+
}
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 7a04c4c..5e7f9bf 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1335,3 +1335,20 @@ class Count(beam.PTransform):
| 'PairWithOne' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
# [END model_library_transforms_count]
+
+
+def file_process_pattern_access_metadata():
+
+ import apache_beam as beam
+ from apache_beam.io import fileio
+
+ # [START FileProcessPatternAccessMetadataSnip1]
+ with beam.Pipeline() as p:
+ readable_files = (p
+ | fileio.MatchFiles('hdfs://path/to/*.txt')
+ | fileio.ReadMatches()
+ | beam.Reshuffle())
+ files_and_contents = (readable_files
+ | beam.Map(lambda x: (x.metadata.path,
+ x.read_utf8())))
+ # [END FileProcessPatternAccessMetadataSnip1]