You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:21 UTC
[26/50] [abbrv] incubator-beam git commit: [flink] adjust directories
according to package name
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
new file mode 100644
index 0000000..ab23b92
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -0,0 +1,452 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.beam.runners.flink.examples;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.GcsOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.Keys;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PDone;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * An example that computes a basic TF-IDF search table for a directory or GCS prefix.
+ *
+ * <p> Concepts: joining data; side inputs; logging
+ *
+ * <p> To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * }</pre>
+ * and a local output file or output prefix on GCS:
+ * <pre>{@code
+ * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * and an output prefix on GCS:
+ * --output=gs://YOUR_OUTPUT_PREFIX
+ * }</pre>
+ *
+ * <p> The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with
+ * {@code --input}.
+ */
+public class TFIDF {
+ /**
+ * Options supported by {@link TFIDF}.
+ * <p>
+ * Inherits standard configuration options.
+ */
+ private interface Options extends PipelineOptions, FlinkPipelineOptions {
+ @Description("Path to the directory or GCS prefix containing files to read from")
+ @Default.String("gs://dataflow-samples/shakespeare/")
+ String getInput();
+ void setInput(String value);
+
+ @Description("Prefix of output URI to write to")
+ @Validation.Required
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ /**
+ * Lists documents contained beneath the {@code options.input} prefix/directory.
+ */
+ public static Set<URI> listInputDocuments(Options options)
+ throws URISyntaxException, IOException {
+ URI baseUri = new URI(options.getInput());
+
+ // List all documents in the directory or GCS prefix.
+ URI absoluteUri;
+ if (baseUri.getScheme() != null) {
+ absoluteUri = baseUri;
+ } else {
+ absoluteUri = new URI(
+ "file",
+ baseUri.getAuthority(),
+ baseUri.getPath(),
+ baseUri.getQuery(),
+ baseUri.getFragment());
+ }
+
+ Set<URI> uris = new HashSet<>();
+ if (absoluteUri.getScheme().equals("file")) {
+ File directory = new File(absoluteUri);
+ for (String entry : directory.list()) {
+ File path = new File(directory, entry);
+ uris.add(path.toURI());
+ }
+ } else if (absoluteUri.getScheme().equals("gs")) {
+ GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
+ URI gcsUriGlob = new URI(
+ absoluteUri.getScheme(),
+ absoluteUri.getAuthority(),
+ absoluteUri.getPath() + "*",
+ absoluteUri.getQuery(),
+ absoluteUri.getFragment());
+ for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
+ uris.add(entry.toUri());
+ }
+ }
+
+ return uris;
+ }
+
+ /**
+ * Reads the documents at the provided uris and returns all lines
+ * from the documents tagged with which document they are from.
+ */
+ public static class ReadDocuments
+ extends PTransform<PInput, PCollection<KV<URI, String>>> {
+ private static final long serialVersionUID = 0;
+
+ private Iterable<URI> uris;
+
+ public ReadDocuments(Iterable<URI> uris) {
+ this.uris = uris;
+ }
+
+ @Override
+ public Coder<?> getDefaultOutputCoder() {
+ return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
+ }
+
+ @Override
+ public PCollection<KV<URI, String>> apply(PInput input) {
+ Pipeline pipeline = input.getPipeline();
+
+ // Create one TextIO.Read transform for each document
+ // and add its output to a PCollectionList
+ PCollectionList<KV<URI, String>> urisToLines =
+ PCollectionList.empty(pipeline);
+
+ // TextIO.Read supports:
+ // - file: URIs and paths locally
+ // - gs: URIs on the service
+ for (final URI uri : uris) {
+ String uriString;
+ if (uri.getScheme().equals("file")) {
+ uriString = new File(uri).getPath();
+ } else {
+ uriString = uri.toString();
+ }
+
+ PCollection<KV<URI, String>> oneUriToLines = pipeline
+ .apply(TextIO.Read.from(uriString)
+ .named("TextIO.Read(" + uriString + ")"))
+ .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
+
+ urisToLines = urisToLines.and(oneUriToLines);
+ }
+
+ return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
+ }
+ }
+
+ /**
+ * A transform containing a basic TF-IDF pipeline. The input consists of KV objects
+ * where the key is the document's URI and the value is a piece
+ * of the document's content. The output is mapping from terms to
+ * scores for each document URI.
+ */
+ public static class ComputeTfIdf
+ extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
+ private static final long serialVersionUID = 0;
+
+ public ComputeTfIdf() { }
+
+ @Override
+ public PCollection<KV<String, KV<URI, Double>>> apply(
+ PCollection<KV<URI, String>> uriToContent) {
+
+ // Compute the total number of documents, and
+ // prepare this singleton PCollectionView for
+ // use as a side input.
+ final PCollectionView<Long> totalDocuments =
+ uriToContent
+ .apply("GetURIs", Keys.<URI>create())
+ .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+ .apply(Count.<URI>globally())
+ .apply(View.<Long>asSingleton());
+
+ // Create a collection of pairs mapping a URI to each
+ // of the words in the document associated with that that URI.
+ PCollection<KV<URI, String>> uriToWords = uriToContent
+ .apply(ParDo.named("SplitWords").of(
+ new DoFn<KV<URI, String>, KV<URI, String>>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ String line = c.element().getValue();
+ for (String word : line.split("\\W+")) {
+ // Log INFO messages when the word “love” is found.
+ if (word.toLowerCase().equals("love")) {
+ LOG.info("Found {}", word.toLowerCase());
+ }
+
+ if (!word.isEmpty()) {
+ c.output(KV.of(uri, word.toLowerCase()));
+ }
+ }
+ }
+ }));
+
+ // Compute a mapping from each word to the total
+ // number of documents in which it appears.
+ PCollection<KV<String, Long>> wordToDocCount = uriToWords
+ .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+ .apply(Values.<String>create())
+ .apply("CountDocs", Count.<String>perElement());
+
+ // Compute a mapping from each URI to the total
+ // number of words in the document associated with that URI.
+ PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
+ .apply("GetURIs2", Keys.<URI>create())
+ .apply("CountWords", Count.<URI>perElement());
+
+ // Count, for each (URI, word) pair, the number of
+ // occurrences of that word in the document associated
+ // with the URI.
+ PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
+ .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
+
+ // Adjust the above collection to a mapping from
+ // (URI, word) pairs to counts into an isomorphic mapping
+ // from URI to (word, count) pairs, to prepare for a join
+ // by the URI key.
+ PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
+ .apply(ParDo.named("ShiftKeys").of(
+ new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey().getKey();
+ String word = c.element().getKey().getValue();
+ Long occurrences = c.element().getValue();
+ c.output(KV.of(uri, KV.of(word, occurrences)));
+ }
+ }));
+
+ // Prepare to join the mapping of URI to (word, count) pairs with
+ // the mapping of URI to total word counts, by associating
+ // each of the input PCollection<KV<URI, ...>> with
+ // a tuple tag. Each input must have the same key type, URI
+ // in this case. The type parameter of the tuple tag matches
+ // the types of the values for each collection.
+ final TupleTag<Long> wordTotalsTag = new TupleTag<>();
+ final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>();
+ KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
+ .of(wordTotalsTag, uriToWordTotal)
+ .and(wordCountsTag, uriToWordAndCount);
+
+ // Perform a CoGroupByKey (a sort of pre-join) on the prepared
+ // inputs. This yields a mapping from URI to a CoGbkResult
+ // (CoGroupByKey Result). The CoGbkResult is a mapping
+ // from the above tuple tags to the values in each input
+ // associated with a particular URI. In this case, each
+ // KV<URI, CoGbkResult> group a URI with the total number of
+ // words in that document as well as all the (word, count)
+ // pairs for particular words.
+ PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
+ .apply("CoGroupByUri", CoGroupByKey.<URI>create());
+
+ // Compute a mapping from each word to a (URI, term frequency)
+ // pair for each URI. A word's term frequency for a document
+ // is simply the number of times that word occurs in the document
+ // divided by the total number of words in the document.
+ PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
+ .apply(ParDo.named("ComputeTermFrequencies").of(
+ new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
+
+ for (KV<String, Long> wordAndCount
+ : c.element().getValue().getAll(wordCountsTag)) {
+ String word = wordAndCount.getKey();
+ Long wordCount = wordAndCount.getValue();
+ Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
+ c.output(KV.of(word, KV.of(uri, termFrequency)));
+ }
+ }
+ }));
+
+ // Compute a mapping from each word to its document frequency.
+ // A word's document frequency in a corpus is the number of
+ // documents in which the word appears divided by the total
+ // number of documents in the corpus. Note how the total number of
+ // documents is passed as a side input; the same value is
+ // presented to each invocation of the DoFn.
+ PCollection<KV<String, Double>> wordToDf = wordToDocCount
+ .apply(ParDo
+ .named("ComputeDocFrequencies")
+ .withSideInputs(totalDocuments)
+ .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Long documentCount = c.element().getValue();
+ Long documentTotal = c.sideInput(totalDocuments);
+ Double documentFrequency = documentCount.doubleValue()
+ / documentTotal.doubleValue();
+
+ c.output(KV.of(word, documentFrequency));
+ }
+ }));
+
+ // Join the term frequency and document frequency
+ // collections, each keyed on the word.
+ final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>();
+ final TupleTag<Double> dfTag = new TupleTag<>();
+ PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
+ .of(tfTag, wordToUriAndTf)
+ .and(dfTag, wordToDf)
+ .apply(CoGroupByKey.<String>create());
+
+ // Compute a mapping from each word to a (URI, TF-IDF) score
+ // for each URI. There are a variety of definitions of TF-IDF
+ // ("term frequency - inverse document frequency") score;
+ // here we use a basic version that is the term frequency
+ // divided by the log of the document frequency.
+
+ return wordToUriAndTfAndDf
+ .apply(ParDo.named("ComputeTfIdf").of(
+ new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ private static final long serialVersionUID1 = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Double df = c.element().getValue().getOnly(dfTag);
+
+ for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
+ URI uri = uriAndTf.getKey();
+ Double tf = uriAndTf.getValue();
+ Double tfIdf = tf * Math.log(1 / df);
+ c.output(KV.of(word, KV.of(uri, tfIdf)));
+ }
+ }
+ }));
+ }
+
+ // Instantiate Logger.
+ // It is suggested that the user specify the class name of the containing class
+ // (in this case ComputeTfIdf).
+ private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
+ }
+
+ /**
+ * A {@link PTransform} to write, in CSV format, a mapping from term and URI
+ * to score.
+ */
+ public static class WriteTfIdf
+ extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
+ private static final long serialVersionUID = 0;
+
+ private String output;
+
+ public WriteTfIdf(String output) {
+ this.output = output;
+ }
+
+ @Override
+ public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+ return wordToUriAndTfIdf
+ .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(String.format("%s,\t%s,\t%f",
+ c.element().getKey(),
+ c.element().getValue().getKey(),
+ c.element().getValue().getValue()));
+ }
+ }))
+ .apply(TextIO.Write
+ .to(output)
+ .withSuffix(".csv"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+ options.setRunner(FlinkPipelineRunner.class);
+
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+
+ pipeline
+ .apply(new ReadDocuments(listInputDocuments(options)))
+ .apply(new ComputeTfIdf())
+ .apply(new WriteTfIdf(options.getOutput()));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
new file mode 100644
index 0000000..ba46301
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.examples;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+public class WordCount {
+
+ public static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ public static class CountWords extends PTransform<PCollection<String>,
+ PCollection<KV<String, Long>>> {
+ @Override
+ public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+
+ // Convert lines of text into individual words.
+ PCollection<String> words = lines.apply(
+ ParDo.of(new ExtractWordsFn()));
+
+ // Count the number of times each word occurs.
+ PCollection<KV<String, Long>> wordCounts =
+ words.apply(Count.<String>perElement());
+
+ return wordCounts;
+ }
+ }
+
+ /** A SimpleFunction that converts a Word and Count into a printable string. */
+ public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
+ @Override
+ public String apply(KV<String, Long> input) {
+ return input.getKey() + ": " + input.getValue();
+ }
+ }
+
+ /**
+ * Options supported by {@link WordCount}.
+ * <p>
+ * Inherits standard configuration options.
+ */
+ public interface Options extends PipelineOptions, FlinkPipelineOptions {
+ @Description("Path of the file to read from")
+ @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+ String getInput();
+ void setInput(String value);
+
+ @Description("Path of the file to write to")
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ public static void main(String[] args) {
+
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
+ .as(Options.class);
+ options.setRunner(FlinkPipelineRunner.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+ .apply(new CountWords())
+ .apply(MapElements.via(new FormatAsTextFn()))
+ .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+
+ p.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
new file mode 100644
index 0000000..8168122
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -0,0 +1,387 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the command:
+ * <li>
+ * <li>
+ * <code>nc -lk 9999</code>
+ * </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class AutoComplete {
+
+ /**
+ * A PTransform that takes as input a list of tokens and returns
+ * the most common tokens per prefix.
+ */
+ public static class ComputeTopCompletions
+ extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> {
+ private static final long serialVersionUID = 0;
+
+ private final int candidatesPerPrefix;
+ private final boolean recursive;
+
+ protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.recursive = recursive;
+ }
+
+ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) {
+ return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+ }
+
+ @Override
+ public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+ PCollection<CompletionCandidate> candidates = input
+ // First count how often each token appears.
+ .apply(new Count.PerElement<String>())
+
+ // Map the KV outputs of Count into our own CompletionCandiate class.
+ .apply(ParDo.named("CreateCompletionCandidates").of(
+ new DoFn<KV<String, Long>, CompletionCandidate>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
+ c.output(cand);
+ }
+ }));
+
+ // Compute the top via either a flat or recursive algorithm.
+ if (recursive) {
+ return candidates
+ .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
+ .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+ } else {
+ return candidates
+ .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
+ }
+ }
+ }
+
+ /**
+ * Lower latency, but more expensive.
+ */
+ private static class ComputeTopFlat
+ extends PTransform<PCollection<CompletionCandidate>,
+ PCollection<KV<String, List<CompletionCandidate>>>> {
+ private static final long serialVersionUID = 0;
+
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ @Override
+ public PCollection<KV<String, List<CompletionCandidate>>> apply(
+ PCollection<CompletionCandidate> input) {
+ return input
+ // For each completion candidate, map it to all prefixes.
+ .apply(ParDo.of(new AllPrefixes(minPrefix)))
+
+ // Find and return the top candiates for each prefix.
+ .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)
+ .withHotKeyFanout(new HotKeyFanout()));
+ }
+
+ private static class HotKeyFanout implements SerializableFunction<String, Integer> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public Integer apply(String input) {
+ return (int) Math.pow(4, 5 - input.length());
+ }
+ }
+ }
+
+ /**
+ * Cheaper but higher latency.
+ *
+ * <p> Returns two PCollections, the first is top prefixes of size greater
+ * than minPrefix, and the second is top prefixes of size exactly
+ * minPrefix.
+ */
+ private static class ComputeTopRecursive
+ extends PTransform<PCollection<CompletionCandidate>,
+ PCollectionList<KV<String, List<CompletionCandidate>>>> {
+ private static final long serialVersionUID = 0;
+
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) {
+ return elem.getKey().length() > minPrefix ? 0 : 1;
+ }
+ }
+
+ private static class FlattenTops
+ extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ for (CompletionCandidate cc : c.element().getValue()) {
+ c.output(cc);
+ }
+ }
+ }
+
+ @Override
+ public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+ PCollection<CompletionCandidate> input) {
+ if (minPrefix > 10) {
+ // Base case, partitioning to return the output in the expected format.
+ return input
+ .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
+ .apply(Partition.of(2, new KeySizePartitionFn()));
+ } else {
+ // If a candidate is in the top N for prefix a...b, it must also be in the top
+ // N for a...bX for every X, which is typlically a much smaller set to consider.
+ // First, compute the top candidate for prefixes of size at least minPrefix + 1.
+ PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
+ .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1));
+ // Consider the top candidates for each prefix of length minPrefix + 1...
+ PCollection<KV<String, List<CompletionCandidate>>> small =
+ PCollectionList
+ .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
+ // ...together with those (previously excluded) candidates of length
+ // exactly minPrefix...
+ .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public Boolean apply(CompletionCandidate c) {
+ return c.getValue().length() == minPrefix;
+ }
+ })))
+ .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
+ // ...set the key to be the minPrefix-length prefix...
+ .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
+ // ...and (re)apply the Top operator to all of them together.
+ .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix));
+
+ PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger
+ .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+
+ return PCollectionList.of(flattenLarger).and(small);
+ }
+ }
+ }
+
+ /**
+ * A DoFn that keys each candidate by all its prefixes.
+ */
+ private static class AllPrefixes
+ extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+ private static final long serialVersionUID = 0;
+
+ private final int minPrefix;
+ private final int maxPrefix;
+ public AllPrefixes(int minPrefix) {
+ this(minPrefix, Integer.MAX_VALUE);
+ }
+ public AllPrefixes(int minPrefix, int maxPrefix) {
+ this.minPrefix = minPrefix;
+ this.maxPrefix = maxPrefix;
+ }
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().value;
+ for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+ KV<String, CompletionCandidate> kv = KV.of(word.substring(0, i), c.element());
+ c.output(kv);
+ }
+ }
+ }
+
+ /**
+ * Class used to store tag-count pairs.
+ */
+ @DefaultCoder(AvroCoder.class)
+ static class CompletionCandidate implements Comparable<CompletionCandidate> {
+ private long count;
+ private String value;
+
+ public CompletionCandidate(String value, long count) {
+ this.value = value;
+ this.count = count;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ // Empty constructor required for Avro decoding.
+ @SuppressWarnings("unused")
+ public CompletionCandidate() {}
+
+ @Override
+ public int compareTo(CompletionCandidate o) {
+ if (this.count < o.count) {
+ return -1;
+ } else if (this.count == o.count) {
+ return this.value.compareTo(o.value);
+ } else {
+ return 1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof CompletionCandidate) {
+ CompletionCandidate that = (CompletionCandidate) other;
+ return this.count == that.count && this.value.equals(that.value);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.valueOf(count).hashCode() ^ value.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "CompletionCandidate[" + value + ", " + count + "]";
+ }
+ }
+
+ static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ /**
+ * Takes as input a the top candidates per prefix, and emits an entity
+ * suitable for writing to Datastore.
+ */
+ static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String>
+ implements DoFn.RequiresWindowAccess{
+
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ StringBuilder str = new StringBuilder();
+ KV<String, List<CompletionCandidate>> elem = c.element();
+
+ str.append(elem.getKey() +" @ "+ c.window() +" -> ");
+ for(CompletionCandidate cand: elem.getValue()) {
+ str.append(cand.toString() + " ");
+ }
+ System.out.println(str.toString());
+ c.output(str.toString());
+ }
+ }
+
+ /**
+ * Options supported by this class.
+ *
+ * <p> Inherits standard Dataflow configuration options.
+ */
+ private interface Options extends WindowedWordCount.StreamingWordCountOptions {
+ @Description("Whether to use the recursive algorithm")
+ @Default.Boolean(true)
+ Boolean getRecursive();
+ void setRecursive(Boolean value);
+ }
+
+ public static void main(String[] args) throws IOException {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ options.setStreaming(true);
+ options.setCheckpointingInterval(1000L);
+ options.setNumberOfExecutionRetries(5);
+ options.setExecutionRetryDelay(3000L);
+ options.setRunner(FlinkPipelineRunner.class);
+
+ PTransform<? super PBegin, PCollection<String>> readSource =
+ Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream");
+ WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+ // Create the pipeline.
+ Pipeline p = Pipeline.create(options);
+ PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
+ .apply(readSource)
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Window.<String>into(windowFn)
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+ .apply(ComputeTopCompletions.top(10, options.getRecursive()));
+
+ toWrite
+ .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile()))
+ .apply(TextIO.Write.to("./outputAutoComplete.txt"));
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
new file mode 100644
index 0000000..3a8bdb0
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.joda.time.Duration;
+
+/**
+ * To run the example, first open two sockets on two terminals by executing the commands:
+ * <li>
+ * <li>
+ * <code>nc -lk 9999</code>, and
+ * </li>
+ * <li>
+ * <code>nc -lk 9998</code>
+ * </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class JoinExamples {
+
+ static PCollection<String> joinEvents(PCollection<String> streamA,
+ PCollection<String> streamB) throws Exception {
+
+ final TupleTag<String> firstInfoTag = new TupleTag<>();
+ final TupleTag<String> secondInfoTag = new TupleTag<>();
+
+ // transform both input collections to tuple collections, where the keys are country
+ // codes in both cases.
+ PCollection<KV<String, String>> firstInfo = streamA.apply(
+ ParDo.of(new ExtractEventDataFn()));
+ PCollection<KV<String, String>> secondInfo = streamB.apply(
+ ParDo.of(new ExtractEventDataFn()));
+
+ // country code 'key' -> CGBKR (<event info>, <country name>)
+ PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+ .of(firstInfoTag, firstInfo)
+ .and(secondInfoTag, secondInfo)
+ .apply(CoGroupByKey.<String>create());
+
+ // Process the CoGbkResult elements generated by the CoGroupByKey transform.
+ // country code 'key' -> string of <event info>, <country name>
+ PCollection<KV<String, String>> finalResultCollection =
+ kvpCollection.apply(ParDo.named("Process").of(
+ new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<String, CoGbkResult> e = c.element();
+ String key = e.getKey();
+
+ String defaultA = "NO_VALUE";
+
+ // the following getOnly is a bit tricky because it expects to have
+ // EXACTLY ONE value in the corresponding stream and for the corresponding key.
+
+ String lineA = e.getValue().getOnly(firstInfoTag, defaultA);
+ for (String lineB : c.element().getValue().getAll(secondInfoTag)) {
+ // Generate a string that combines information from both collection values
+ c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB));
+ }
+ }
+ }));
+
+ return finalResultCollection
+ .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String result = c.element().getKey() + " -> " + c.element().getValue();
+ System.out.println(result);
+ c.output(result);
+ }
+ }));
+ }
+
+ static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String line = c.element().toLowerCase();
+ String key = line.split("\\s")[0];
+ c.output(KV.of(key, line));
+ }
+ }
+
+ private interface Options extends WindowedWordCount.StreamingWordCountOptions {
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ options.setStreaming(true);
+ options.setCheckpointingInterval(1000L);
+ options.setNumberOfExecutionRetries(5);
+ options.setExecutionRetryDelay(3000L);
+ options.setRunner(FlinkPipelineRunner.class);
+
+ PTransform<? super PBegin, PCollection<String>> readSourceA =
+ Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
+ PTransform<? super PBegin, PCollection<String>> readSourceB =
+ Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
+
+ WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+ Pipeline p = Pipeline.create(options);
+
+ // the following two 'applys' create multiple inputs to our pipeline, one for each
+ // of our two input sources.
+ PCollection<String> streamA = p.apply(readSourceA)
+ .apply(Window.<String>into(windowFn)
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes());
+ PCollection<String> streamB = p.apply(readSourceB)
+ .apply(Window.<String>into(windowFn)
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes());
+
+ PCollection<String> formattedResults = joinEvents(streamA, streamB);
+ formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
+ p.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
new file mode 100644
index 0000000..fa0c8e9
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.joda.time.Duration;
+
+import java.util.Properties;
+
+public class KafkaWindowedWordCountExample {
+
+ static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from
+ static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact
+ static final String GROUP_ID = "myGroup"; // Default groupId
+ static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka
+
+ public static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+ System.out.println(row);
+ c.output(row);
+ }
+ }
+
+ public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions {
+ @Description("The Kafka topic to read from")
+ @Default.String(KAFKA_TOPIC)
+ String getKafkaTopic();
+
+ void setKafkaTopic(String value);
+
+ @Description("The Kafka Broker to read from")
+ @Default.String(KAFKA_BROKER)
+ String getBroker();
+
+ void setBroker(String value);
+
+ @Description("The Zookeeper server to connect to")
+ @Default.String(ZOOKEEPER)
+ String getZookeeper();
+
+ void setZookeeper(String value);
+
+ @Description("The groupId")
+ @Default.String(GROUP_ID)
+ String getGroup();
+
+ void setGroup(String value);
+
+ }
+
+ public static void main(String[] args) {
+ PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
+ KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
+ options.setJobName("KafkaExample");
+ options.setStreaming(true);
+ options.setCheckpointingInterval(1000L);
+ options.setNumberOfExecutionRetries(5);
+ options.setExecutionRetryDelay(3000L);
+ options.setRunner(FlinkPipelineRunner.class);
+
+ System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
+ Pipeline pipeline = Pipeline.create(options);
+
+ Properties p = new Properties();
+ p.setProperty("zookeeper.connect", options.getZookeeper());
+ p.setProperty("bootstrap.servers", options.getBroker());
+ p.setProperty("group.id", options.getGroup());
+
+ // this is the Flink consumer that reads the input to
+ // the program from a kafka topic.
+ FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>(
+ options.getKafkaTopic(),
+ new SimpleStringSchema(), p);
+
+ PCollection<String> words = pipeline
+ .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount"))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes());
+
+ PCollection<KV<String, Long>> wordCounts =
+ words.apply(Count.<String>perElement());
+
+ wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+ .apply(TextIO.Write.to("./outputKafka.txt"));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
new file mode 100644
index 0000000..6af044d
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the command:
+ * <li>
+ * <li>
+ * <code>nc -lk 9999</code>
+ * </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class WindowedWordCount {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
+
+ static final long WINDOW_SIZE = 10; // Default window duration in seconds
+ static final long SLIDE_SIZE = 5; // Default window slide in seconds
+
+ static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+ c.output(row);
+ }
+ }
+
+ static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options {
+ @Description("Sliding window duration, in seconds")
+ @Default.Long(WINDOW_SIZE)
+ Long getWindowSize();
+
+ void setWindowSize(Long value);
+
+ @Description("Window slide, in seconds")
+ @Default.Long(SLIDE_SIZE)
+ Long getSlide();
+
+ void setSlide(Long value);
+ }
+
+ public static void main(String[] args) throws IOException {
+ StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
+ options.setStreaming(true);
+ options.setWindowSize(10L);
+ options.setSlide(5L);
+ options.setCheckpointingInterval(1000L);
+ options.setNumberOfExecutionRetries(5);
+ options.setExecutionRetryDelay(3000L);
+ options.setRunner(FlinkPipelineRunner.class);
+
+ LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +
+ " sec. and a slide of " + options.getSlide());
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ PCollection<String> words = pipeline
+ .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount"))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
+ .every(Duration.standardSeconds(options.getSlide())))
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes());
+
+ PCollection<KV<String, Long>> wordCounts =
+ words.apply(Count.<String>perElement());
+
+ wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+ .apply(TextIO.Write.to("./outputWordCount.txt"));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
new file mode 100644
index 0000000..cd25ba3
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.io;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+/**
+ * Transform for printing the contents of a {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+ * to standard output.
+ *
+ * This is Flink-specific and will only work when executed using the
+ * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
+ */
+public class ConsoleIO {
+
+ /**
+ * A PTransform that writes a PCollection to a standard output.
+ */
+ public static class Write {
+
+ /**
+ * Returns a ConsoleIO.Write PTransform with a default step name.
+ */
+ public static Bound create() {
+ return new Bound();
+ }
+
+ /**
+ * Returns a ConsoleIO.Write PTransform with the given step name.
+ */
+ public static Bound named(String name) {
+ return new Bound().named(name);
+ }
+
+ /**
+ * A PTransform that writes a bounded PCollection to standard output.
+ */
+ public static class Bound extends PTransform<PCollection<?>, PDone> {
+ private static final long serialVersionUID = 0;
+
+ Bound() {
+ super("ConsoleIO.Write");
+ }
+
+ Bound(String name) {
+ super(name);
+ }
+
+ /**
+ * Returns a new ConsoleIO.Write PTransform that's like this one but with the given
+ * step
+ * name. Does not modify this object.
+ */
+ public Bound named(String name) {
+ return new Bound(name);
+ }
+
+ @Override
+ public PDone apply(PCollection<?> input) {
+ return PDone.in(input.getPipeline());
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
new file mode 100644
index 0000000..5201423
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
+ * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
+ */
+public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+
+ /**
+ * The necessary context in the case of a batch job.
+ */
+ private final FlinkBatchTranslationContext batchContext;
+
+ private int depth = 0;
+
+ /**
+ * Composite transform that we want to translate before proceeding with other transforms.
+ */
+ private PTransform<?, ?> currentCompositeTransform;
+
+ public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
+ this.batchContext = new FlinkBatchTranslationContext(env, options);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Pipeline Visitor Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void enterCompositeTransform(TransformTreeNode node) {
+ System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null && currentCompositeTransform == null) {
+
+ BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator != null) {
+ currentCompositeTransform = transform;
+ if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
+ // we can only optimize CoGroupByKey for input size 2
+ currentCompositeTransform = null;
+ }
+ }
+ }
+ this.depth++;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null && currentCompositeTransform == transform) {
+
+ BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator != null) {
+ System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+ applyBatchTransform(transform, node, translator);
+ currentCompositeTransform = null;
+ } else {
+ throw new IllegalStateException("Attempted to translate composite transform " +
+ "but no translator was found: " + currentCompositeTransform);
+ }
+ }
+ this.depth--;
+ System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+ }
+
+ @Override
+ public void visitTransform(TransformTreeNode node) {
+ System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+ if (currentCompositeTransform != null) {
+ // ignore it
+ return;
+ }
+
+ // get the transformation corresponding to hte node we are
+ // currently visiting and translate it into its Flink alternative.
+
+ PTransform<?, ?> transform = node.getTransform();
+ BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator == null) {
+ System.out.println(node.getTransform().getClass());
+ throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+ }
+ applyBatchTransform(transform, node, translator);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ // do nothing here
+ }
+
+ private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+
+ @SuppressWarnings("unchecked")
+ BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+
+ // create the applied PTransform on the batchContext
+ batchContext.setCurrentTransform(AppliedPTransform.of(
+ node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+ typedTranslator.translateNode(typedTransform, batchContext);
+ }
+
+ /**
+ * A translator of a {@link PTransform}.
+ */
+ public interface BatchTransformTranslator<Type extends PTransform> {
+ void translateNode(Type transform, FlinkBatchTranslationContext context);
+ }
+
+ private static String genSpaces(int n) {
+ String s = "";
+ for (int i = 0; i < n; i++) {
+ s += "| ";
+ }
+ return s;
+ }
+
+ private static String formatNodeName(TransformTreeNode node) {
+ return node.toString().split("@")[1] + node.getTransform();
+ }
+}