You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:03 UTC
[08/50] [abbrv] incubator-beam git commit: [runner] add streaming
support with checkpointing
[runner] add streaming support with checkpointing
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/edff0785
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/edff0785
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/edff0785
Branch: refs/heads/master
Commit: edff0785a82d2c6c01abeb3c64ca0d2958ccd0fd
Parents: 517c1bd
Author: Kostas Kloudas <kk...@gmail.com>
Authored: Wed Dec 9 17:30:53 2015 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800
----------------------------------------------------------------------
runners/flink/pom.xml | 28 +
.../dataflow/FlinkJobExecutionEnvironment.java | 238 ++++++++
.../flink/dataflow/FlinkPipelineRunner.java | 99 +--
.../flink/dataflow/examples/WordCount.java | 2 +-
.../examples/streaming/AutoComplete.java | 384 ++++++++++++
.../examples/streaming/JoinExamples.java | 157 +++++
.../KafkaWindowedWordCountExample.java | 138 +++++
.../examples/streaming/WindowedWordCount.java | 126 ++++
.../FlinkBatchPipelineTranslator.java | 152 +++++
.../FlinkBatchTransformTranslators.java | 593 ++++++++++++++++++
.../FlinkBatchTranslationContext.java | 129 ++++
.../translation/FlinkPipelineTranslator.java | 145 +----
.../FlinkStreamingPipelineTranslator.java | 138 +++++
.../FlinkStreamingTransformTranslators.java | 356 +++++++++++
.../FlinkStreamingTranslationContext.java | 86 +++
.../translation/FlinkTransformTranslators.java | 594 ------------------
.../translation/TranslationContext.java | 129 ----
.../translation/types/CoderComparator.java | 216 +++++++
.../translation/types/CoderComperator.java | 218 -------
.../translation/types/CoderTypeInformation.java | 6 +-
.../translation/types/CoderTypeSerializer.java | 2 -
.../translation/types/KvCoderComperator.java | 2 +-
.../types/VoidCoderTypeSerializer.java | 1 -
.../translation/wrappers/SourceInputFormat.java | 4 +-
.../streaming/FlinkAbstractParDoWrapper.java | 274 +++++++++
.../FlinkGroupAlsoByWindowWrapper.java | 601 +++++++++++++++++++
.../streaming/FlinkGroupByKeyWrapper.java | 56 ++
.../streaming/FlinkParDoBoundMultiWrapper.java | 72 +++
.../streaming/FlinkParDoBoundWrapper.java | 89 +++
.../streaming/io/UnboundedFlinkSource.java | 76 +++
.../streaming/io/UnboundedSocketSource.java | 228 +++++++
.../streaming/io/UnboundedSourceWrapper.java | 120 ++++
.../state/AbstractFlinkTimerInternals.java | 139 +++++
.../streaming/state/FlinkStateInternals.java | 533 ++++++++++++++++
.../streaming/state/StateCheckpointReader.java | 89 +++
.../streaming/state/StateCheckpointUtils.java | 152 +++++
.../streaming/state/StateCheckpointWriter.java | 127 ++++
.../wrappers/streaming/state/StateType.java | 67 +++
.../streaming/GroupAlsoByWindowTest.java | 507 ++++++++++++++++
.../streaming/StateSerializationTest.java | 257 ++++++++
.../flink/dataflow/util/JoinExamples.java | 4 +-
41 files changed, 6157 insertions(+), 1177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 6102d74..14693b8 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -71,6 +71,18 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
@@ -114,6 +126,22 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
new file mode 100644
index 0000000..66d60fa
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow;
+
+import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
+import com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator;
+import com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class FlinkJobExecutionEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkJobExecutionEnvironment.class);
+
+ private final FlinkPipelineOptions options;
+
+ /**
+ * The Flink Batch execution environment. This is instantiated to either a
+ * {@link org.apache.flink.api.java.CollectionEnvironment},
+ * a {@link org.apache.flink.api.java.LocalEnvironment} or
+ * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
+ * options.
+ */
+ private ExecutionEnvironment flinkBatchEnv;
+
+
+ /**
+ * The Flink Streaming execution environment. This is instantiated to either a
+ * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+ * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+ * on the configuration options, and more specifically, the url of the master url.
+ */
+ private StreamExecutionEnvironment flinkStreamEnv;
+
+ /**
+ * Translator for this FlinkPipelineRunner. Its role is to translate the Dataflow operators to
+ * their Flink based counterparts. Based on the options provided by the user, if we have a streaming job,
+ * this is instantiated to a FlinkStreamingPipelineTranslator. In other case, i.e. a batch job,
+ * a FlinkBatchPipelineTranslator is created.
+ */
+ private FlinkPipelineTranslator flinkPipelineTranslator;
+
+ public FlinkJobExecutionEnvironment(FlinkPipelineOptions options) {
+ if (options == null) {
+ throw new IllegalArgumentException("Options in the FlinkJobExecutionEnvironment cannot be NULL.");
+ }
+ this.options = options;
+ this.createJobEnvironment();
+ this.createJobGraphTranslator();
+ }
+
+ /**
+ * Depending on the type of job (Streaming or Batch) and the user-specified options,
+ * this method creates the adequate ExecutionEnvironment.
+ */
+ private void createJobEnvironment() {
+ if (options.isStreaming()) {
+ LOG.info("Creating the required STREAMING Environment.");
+ createStreamExecutionEnvironment();
+ } else {
+ LOG.info("Creating the required BATCH Environment.");
+ createBatchExecutionEnvironment();
+ }
+ }
+
+ /**
+ * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
+ * translator. In the case of batch, it will work with DataSets, while for streaming, it will work
+ * with DataStreams.
+ */
+ private void createJobGraphTranslator() {
+ checkInitializationState();
+ if (this.flinkPipelineTranslator != null) {
+ throw new IllegalStateException("JobGraphTranslator already initialized.");
+ }
+
+ this.flinkPipelineTranslator = options.isStreaming() ?
+ new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+ new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+ }
+
+ public void translate(Pipeline pipeline) {
+ checkInitializationState();
+ if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+ createJobEnvironment();
+ }
+ if (this.flinkPipelineTranslator == null) {
+ createJobGraphTranslator();
+ }
+ this.flinkPipelineTranslator.translate(pipeline);
+ }
+
+ public JobExecutionResult executeJob() throws Exception {
+ if (options.isStreaming()) {
+
+ System.out.println("Plan: " + this.flinkStreamEnv.getExecutionPlan());
+
+ if (this.flinkStreamEnv == null) {
+ throw new RuntimeException("JobExecutionEnvironment not initialized.");
+ }
+ if (this.flinkPipelineTranslator == null) {
+ throw new RuntimeException("JobGraphTranslator not initialized.");
+ }
+ return this.flinkStreamEnv.execute();
+ } else {
+ if (this.flinkBatchEnv == null) {
+ throw new RuntimeException("JobExecutionEnvironment not initialized.");
+ }
+ if (this.flinkPipelineTranslator == null) {
+ throw new RuntimeException("JobGraphTranslator not initialized.");
+ }
+ return this.flinkBatchEnv.execute();
+ }
+ }
+
+ /**
+ * If the submitted job is a batch processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private void createBatchExecutionEnvironment() {
+ if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+ throw new RuntimeException("JobExecutionEnvironment already initialized.");
+ }
+
+ String masterUrl = options.getFlinkMaster();
+ this.flinkStreamEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[collection]")) {
+ this.flinkBatchEnv = new CollectionEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]),
+ stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
+ this.flinkBatchEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkBatchEnv.getParallelism());
+ }
+
+ /**
+ * If the submitted job is a stream processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private void createStreamExecutionEnvironment() {
+ if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+ throw new RuntimeException("JobExecutionEnvironment already initialized.");
+ }
+
+ String masterUrl = options.getFlinkMaster();
+ this.flinkBatchEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1) {
+ this.flinkStreamEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkStreamEnv.getParallelism());
+
+ // although we do not use the generated timestamps,
+ // enabling timestamps is needed for the watermarks.
+ this.flinkStreamEnv.getConfig().enableTimestamps();
+
+ this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ this.flinkStreamEnv.enableCheckpointing(1000);
+ this.flinkStreamEnv.setNumberOfExecutionRetries(5);
+
+ LOG.info("Setting execution retry delay to 3 sec");
+ this.flinkStreamEnv.getConfig().setExecutionRetryDelay(3000);
+ }
+
+ private final void checkInitializationState() {
+ if (this.options == null) {
+ throw new IllegalStateException("FlinkJobExecutionEnvironment is not initialized yet.");
+ }
+
+ if (options.isStreaming() && this.flinkBatchEnv != null) {
+ throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
+ } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
+ throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index ae31f48..f57fed2 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -15,7 +15,6 @@
*/
package com.dataartisans.flink.dataflow;
-import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
@@ -28,8 +27,6 @@ import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,27 +42,19 @@ import java.util.Map;
* A {@link PipelineRunner} that executes the operations in the
* pipeline by first translating them to a Flink Plan and then executing them either locally
* or on a Flink cluster, depending on the configuration.
- *
+ * <p>
* This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
*/
public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
- /** Provided options. */
- private final FlinkPipelineOptions options;
-
/**
- * The Flink execution environment. This is instantiated to either a
- * {@link org.apache.flink.api.java.CollectionEnvironment},
- * a {@link org.apache.flink.api.java.LocalEnvironment} or
- * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
- * options.
+ * Provided options.
*/
- private final ExecutionEnvironment flinkEnv;
+ private final FlinkPipelineOptions options;
- /** Translator for this FlinkPipelineRunner, based on options. */
- private final FlinkPipelineTranslator translator;
+ private final FlinkJobExecutionEnvironment flinkJobEnv;
/**
* Construct a runner from the provided options.
@@ -109,90 +98,38 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
flinkOptions.setFlinkMaster("[auto]");
}
- if (flinkOptions.isStreaming()) {
- throw new RuntimeException("Streaming is currently not supported.");
- }
-
return new FlinkPipelineRunner(flinkOptions);
}
private FlinkPipelineRunner(FlinkPipelineOptions options) {
this.options = options;
- this.flinkEnv = createExecutionEnvironment(options);
-
- // set parallelism in the options (required by some execution code)
- options.setParallelism(flinkEnv.getParallelism());
-
- this.translator = new FlinkPipelineTranslator(flinkEnv, options);
- }
-
- /**
- * Create Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
- * on the options.
- */
- private ExecutionEnvironment createExecutionEnvironment(FlinkPipelineOptions options) {
- String masterUrl = options.getFlinkMaster();
-
-
- if (masterUrl.equals("[local]")) {
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- if (options.getParallelism() != -1) {
- env.setParallelism(options.getParallelism());
- }
- return env;
- } else if (masterUrl.equals("[collection]")) {
- return new CollectionEnvironment();
- } else if (masterUrl.equals("[auto]")) {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- if (options.getParallelism() != -1) {
- env.setParallelism(options.getParallelism());
- }
- return env;
- } else if (masterUrl.matches(".*:\\d*")) {
- String[] parts = masterUrl.split(":");
- List<String> stagingFiles = options.getFilesToStage();
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(parts[0],
- Integer.parseInt(parts[1]),
- stagingFiles.toArray(new String[stagingFiles.size()]));
- if (options.getParallelism() != -1) {
- env.setParallelism(options.getParallelism());
- }
- return env;
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- if (options.getParallelism() != -1) {
- env.setParallelism(options.getParallelism());
- }
- return env;
- }
+ this.flinkJobEnv = new FlinkJobExecutionEnvironment(options);
}
@Override
public FlinkRunnerResult run(Pipeline pipeline) {
LOG.info("Executing pipeline using FlinkPipelineRunner.");
-
+
LOG.info("Translating pipeline to Flink program.");
-
- translator.translate(pipeline);
-
+
+ this.flinkJobEnv.translate(pipeline);
+
LOG.info("Starting execution of Flink program.");
JobExecutionResult result;
try {
- result = flinkEnv.execute();
- }
- catch (Exception e) {
+ result = this.flinkJobEnv.executeJob();
+ } catch (Exception e) {
LOG.error("Pipeline execution failed", e);
throw new RuntimeException("Pipeline execution failed", e);
}
-
+
LOG.info("Execution finished in {} msecs", result.getNetRuntime());
-
+
Map<String, Object> accumulators = result.getAllAccumulatorResults();
if (accumulators != null && !accumulators.isEmpty()) {
LOG.info("Final aggregator values:");
-
+
for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
LOG.info("{} : {}", entry.getKey(), entry.getValue());
}
@@ -230,16 +167,18 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
/////////////////////////////////////////////////////////////////////////////
@Override
- public String toString() { return "DataflowPipelineRunner#" + hashCode(); }
+ public String toString() {
+ return "DataflowPipelineRunner#" + hashCode();
+ }
/**
* Attempts to detect all the resources the class loader has access to. This does not recurse
* to class loader parents stopping it from pulling in resources from the system class loader.
*
* @param classLoader The URLClassLoader to use to detect resources to stage.
- * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
- * of the resources the class loader exposes is not a file resource.
* @return A list of absolute paths to the resources the class loader uses.
+ * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
+ * of the resources the class loader exposes is not a file resource.
*/
protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
if (!(classLoader instanceof URLClassLoader)) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
index 82f1e46..7857778 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
@@ -43,7 +43,7 @@ public class WordCount {
String getOutput();
void setOutput(String value);
}
-
+
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
new file mode 100644
index 0000000..0245a7b
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the command:
+ * <li>
+ * <li>
+ * <code>nc -lk 9999</code>
+ * </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class AutoComplete {
+
+ /**
+ * A PTransform that takes as input a list of tokens and returns
+ * the most common tokens per prefix.
+ */
+ public static class ComputeTopCompletions
+ extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> {
+ private static final long serialVersionUID = 0;
+
+ private final int candidatesPerPrefix;
+ private final boolean recursive;
+
+ protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.recursive = recursive;
+ }
+
+ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) {
+ return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+ }
+
+ @Override
+ public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+ PCollection<CompletionCandidate> candidates = input
+ // First count how often each token appears.
+ .apply(new Count.PerElement<String>())
+
+ // Map the KV outputs of Count into our own CompletionCandiate class.
+ .apply(ParDo.named("CreateCompletionCandidates").of(
+ new DoFn<KV<String, Long>, CompletionCandidate>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
+ c.output(cand);
+ }
+ }));
+
+ // Compute the top via either a flat or recursive algorithm.
+ if (recursive) {
+ return candidates
+ .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
+ .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+ } else {
+ return candidates
+ .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
+ }
+ }
+ }
+
+ /**
+ * Lower latency, but more expensive.
+ */
+ private static class ComputeTopFlat
+ extends PTransform<PCollection<CompletionCandidate>,
+ PCollection<KV<String, List<CompletionCandidate>>>> {
+ private static final long serialVersionUID = 0;
+
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ @Override
+ public PCollection<KV<String, List<CompletionCandidate>>> apply(
+ PCollection<CompletionCandidate> input) {
+ return input
+ // For each completion candidate, map it to all prefixes.
+ .apply(ParDo.of(new AllPrefixes(minPrefix)))
+
+ // Find and return the top candiates for each prefix.
+ .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)
+ .withHotKeyFanout(new HotKeyFanout()));
+ }
+
+ private static class HotKeyFanout implements SerializableFunction<String, Integer> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public Integer apply(String input) {
+ return (int) Math.pow(4, 5 - input.length());
+ }
+ }
+ }
+
+ /**
+ * Cheaper but higher latency.
+ *
+ * <p> Returns two PCollections, the first is top prefixes of size greater
+ * than minPrefix, and the second is top prefixes of size exactly
+ * minPrefix.
+ */
+ private static class ComputeTopRecursive
+ extends PTransform<PCollection<CompletionCandidate>,
+ PCollectionList<KV<String, List<CompletionCandidate>>>> {
+ private static final long serialVersionUID = 0;
+
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) {
+ return elem.getKey().length() > minPrefix ? 0 : 1;
+ }
+ }
+
+ private static class FlattenTops
+ extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ for (CompletionCandidate cc : c.element().getValue()) {
+ c.output(cc);
+ }
+ }
+ }
+
+ @Override
+ public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+ PCollection<CompletionCandidate> input) {
+ if (minPrefix > 10) {
+ // Base case, partitioning to return the output in the expected format.
+ return input
+ .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
+ .apply(Partition.of(2, new KeySizePartitionFn()));
+ } else {
+ // If a candidate is in the top N for prefix a...b, it must also be in the top
+ // N for a...bX for every X, which is typlically a much smaller set to consider.
+ // First, compute the top candidate for prefixes of size at least minPrefix + 1.
+ PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
+ .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1));
+ // Consider the top candidates for each prefix of length minPrefix + 1...
+ PCollection<KV<String, List<CompletionCandidate>>> small =
+ PCollectionList
+ .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
+ // ...together with those (previously excluded) candidates of length
+ // exactly minPrefix...
+ .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public Boolean apply(CompletionCandidate c) {
+ return c.getValue().length() == minPrefix;
+ }
+ })))
+ .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
+ // ...set the key to be the minPrefix-length prefix...
+ .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
+ // ...and (re)apply the Top operator to all of them together.
+ .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix));
+
+ PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger
+ .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+
+ return PCollectionList.of(flattenLarger).and(small);
+ }
+ }
+ }
+
+ /**
+ * A DoFn that keys each candidate by all its prefixes.
+ */
+ private static class AllPrefixes
+ extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+ private static final long serialVersionUID = 0;
+
+ private final int minPrefix;
+ private final int maxPrefix;
+ public AllPrefixes(int minPrefix) {
+ this(minPrefix, Integer.MAX_VALUE);
+ }
+ public AllPrefixes(int minPrefix, int maxPrefix) {
+ this.minPrefix = minPrefix;
+ this.maxPrefix = maxPrefix;
+ }
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().value;
+ for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+ KV kv = KV.of(word.substring(0, i), c.element());
+ c.output(kv);
+ }
+ }
+ }
+
+ /**
+ * Class used to store tag-count pairs.
+ */
+ @DefaultCoder(AvroCoder.class)
+ static class CompletionCandidate implements Comparable<CompletionCandidate> {
+ private long count;
+ private String value;
+
+ public CompletionCandidate(String value, long count) {
+ this.value = value;
+ this.count = count;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ // Empty constructor required for Avro decoding.
+ @SuppressWarnings("unused")
+ public CompletionCandidate() {}
+
+ @Override
+ public int compareTo(CompletionCandidate o) {
+ if (this.count < o.count) {
+ return -1;
+ } else if (this.count == o.count) {
+ return this.value.compareTo(o.value);
+ } else {
+ return 1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof CompletionCandidate) {
+ CompletionCandidate that = (CompletionCandidate) other;
+ return this.count == that.count && this.value.equals(that.value);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.valueOf(count).hashCode() ^ value.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "CompletionCandidate[" + value + ", " + count + "]";
+ }
+ }
+
+ static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ /**
+ * Takes as input a the top candidates per prefix, and emits an entity
+ * suitable for writing to Datastore.
+ */
+ static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ StringBuilder str = new StringBuilder();
+ KV<String, List<CompletionCandidate>> elem = c.element();
+
+ str.append(elem.getKey() +" @ "+ c.window() +" -> ");
+ for(CompletionCandidate cand: elem.getValue()) {
+ str.append(cand.toString() + " ");
+ }
+ System.out.println(str.toString());
+ c.output(str.toString());
+ }
+ }
+
+ /**
+ * Options supported by this class.
+ *
+ * <p> Inherits standard Dataflow configuration options.
+ */
+ private static interface Options extends WindowedWordCount.StreamingWordCountOptions {
+ @Description("Whether to use the recursive algorithm")
+ @Default.Boolean(true)
+ Boolean getRecursive();
+ void setRecursive(Boolean value);
+ }
+
+ public static void main(String[] args) throws IOException {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ options.setStreaming(true);
+ options.setRunner(FlinkPipelineRunner.class);
+
+ PTransform<? super PBegin, PCollection<String>> readSource =
+ Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream");
+ WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+ // Create the pipeline.
+ Pipeline p = Pipeline.create(options);
+ PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
+ .apply(readSource)
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Window.<String>into(windowFn)
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+ .apply(ComputeTopCompletions.top(10, options.getRecursive()));
+
+ toWrite
+ .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile()))
+ .apply(TextIO.Write.to("./outputAutoComplete.txt"));
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
new file mode 100644
index 0000000..b0cc4fa
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.joda.time.Duration;
+
+/**
+ * To run the example, first open two sockets on two terminals by executing the commands:
+ * <li>
+ * <li>
+ * <code>nc -lk 9999</code>, and
+ * </li>
+ * <li>
+ * <code>nc -lk 9998</code>
+ * </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class JoinExamples {
+
+ static PCollection<String> joinEvents(PCollection<String> streamA,
+ PCollection<String> streamB) throws Exception {
+
+ final TupleTag<String> firstInfoTag = new TupleTag<String>();
+ final TupleTag<String> secondInfoTag = new TupleTag<String>();
+
+ // transform both input collections to tuple collections, where the keys are country
+ // codes in both cases.
+ PCollection<KV<String, String>> firstInfo = streamA.apply(
+ ParDo.of(new ExtractEventDataFn()));
+ PCollection<KV<String, String>> secondInfo = streamB.apply(
+ ParDo.of(new ExtractEventDataFn()));
+
+ // country code 'key' -> CGBKR (<event info>, <country name>)
+ PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+ .of(firstInfoTag, firstInfo)
+ .and(secondInfoTag, secondInfo)
+ .apply(CoGroupByKey.<String>create());
+
+ // Process the CoGbkResult elements generated by the CoGroupByKey transform.
+ // country code 'key' -> string of <event info>, <country name>
+ PCollection<KV<String, String>> finalResultCollection =
+ kvpCollection.apply(ParDo.named("Process").of(
+ new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<String, CoGbkResult> e = c.element();
+ String key = e.getKey();
+
+ String defaultA = "NO_VALUE";
+
+ // the following getOnly is a bit tricky because it expects to have
+ // EXACTLY ONE value in the corresponding stream and for the corresponding key.
+
+ String lineA = e.getValue().getOnly(firstInfoTag, defaultA);
+ for (String lineB : c.element().getValue().getAll(secondInfoTag)) {
+ // Generate a string that combines information from both collection values
+ c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB));
+ }
+ }
+ }));
+
+ return finalResultCollection
+ .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String result = c.element().getKey() + " -> " + c.element().getValue();
+ System.out.println(result);
+ c.output(result);
+ }
+ }));
+ }
+
+ static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String line = c.element().toLowerCase();
+ String key = line.split("\\s")[0];
+ c.output(KV.of(key, line));
+ }
+ }
+
+ private static interface Options extends WindowedWordCount.StreamingWordCountOptions {
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+ // make it a streaming example.
+ options.setStreaming(true);
+ options.setRunner(FlinkPipelineRunner.class);
+
+ PTransform<? super PBegin, PCollection<String>> readSourceA =
+ Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
+ PTransform<? super PBegin, PCollection<String>> readSourceB =
+ Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
+
+ WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+ Pipeline p = Pipeline.create(options);
+
+ // the following two 'applys' create multiple inputs to our pipeline, one for each
+ // of our two input sources.
+ PCollection<String> streamA = p.apply(readSourceA)
+ .apply(Window.<String>into(windowFn)
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes());
+ PCollection<String> streamB = p.apply(readSourceB)
+ .apply(Window.<String>into(windowFn)
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes());
+
+ PCollection<String> formattedResults = joinEvents(streamA, streamB);
+ formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
+ p.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
new file mode 100644
index 0000000..46c9bd6
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedFlinkSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.joda.time.Duration;
+
+import java.util.Properties;
+
+public class KafkaWindowedWordCountExample {
+
+ static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from
+ static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact
+ static final String GROUP_ID = "myGroup"; // Default groupId
+ static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka
+
+ public static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+ System.out.println(row);
+ c.output(row);
+ }
+ }
+
+ public static interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions {
+ @Description("The Kafka topic to read from")
+ @Default.String(KAFKA_TOPIC)
+ String getKafkaTopic();
+
+ void setKafkaTopic(String value);
+
+ @Description("The Kafka Broker to read from")
+ @Default.String(KAFKA_BROKER)
+ String getBroker();
+
+ void setBroker(String value);
+
+ @Description("The Zookeeper server to connect to")
+ @Default.String(ZOOKEEPER)
+ String getZookeeper();
+
+ void setZookeeper(String value);
+
+ @Description("The groupId")
+ @Default.String(GROUP_ID)
+ String getGroup();
+
+ void setGroup(String value);
+
+ }
+
+ public static void main(String[] args) {
+ PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
+ KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
+ options.setJobName("KafkaExample");
+ options.setStreaming(true);
+ options.setRunner(FlinkPipelineRunner.class);
+
+ System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
+ Pipeline pipeline = Pipeline.create(options);
+
+ Properties p = new Properties();
+ p.setProperty("zookeeper.connect", options.getZookeeper());
+ p.setProperty("bootstrap.servers", options.getBroker());
+ p.setProperty("group.id", options.getGroup());
+
+ // this is the Flink consumer that reads the input to
+ // the program from a kafka topic.
+ FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>(
+ options.getKafkaTopic(),
+ new SimpleStringSchema(), p);
+
+ PCollection<String> words = pipeline
+ .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount"))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes());
+
+ PCollection<KV<String, Long>> wordCounts =
+ words.apply(Count.<String>perElement());
+
+ wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+ .apply(TextIO.Write.to("./outputKafka.txt"));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
new file mode 100644
index 0000000..1d4a44b
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the command:
+ * <li>
+ * <li>
+ * <code>nc -lk 9999</code>
+ * </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class WindowedWordCount {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
+
+ static final long WINDOW_SIZE = 10; // Default window duration in seconds
+ static final long SLIDE_SIZE = 5; // Default window slide in seconds
+
+ static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+ c.output(row);
+ }
+ }
+
+ static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ public static interface StreamingWordCountOptions extends com.dataartisans.flink.dataflow.examples.WordCount.Options {
+ @Description("Sliding window duration, in seconds")
+ @Default.Long(WINDOW_SIZE)
+ Long getWindowSize();
+
+ void setWindowSize(Long value);
+
+ @Description("Window slide, in seconds")
+ @Default.Long(SLIDE_SIZE)
+ Long getSlide();
+
+ void setSlide(Long value);
+ }
+
+ public static void main(String[] args) throws IOException {
+ StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
+ options.setStreaming(true);
+ options.setWindowSize(10L);
+ options.setSlide(5L);
+ options.setRunner(FlinkPipelineRunner.class);
+
+ LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +
+ " sec. and a slide of " + options.getSlide());
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ PCollection<String> words = pipeline
+ .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount"))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
+ .every(Duration.standardSeconds(options.getSlide())))
+ .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes());
+
+ PCollection<KV<String, Long>> wordCounts =
+ words.apply(Count.<String>perElement());
+
+ wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+ .apply(TextIO.Write.to("./outputWordCount.txt"));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
new file mode 100644
index 0000000..8c0183e
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
+ * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
+ */
+public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+
+ /**
+ * The necessary context in the case of a batch job.
+ */
+ private final FlinkBatchTranslationContext batchContext;
+
+ private int depth = 0;
+
+ /**
+ * Composite transform that we want to translate before proceeding with other transforms.
+ */
+ private PTransform<?, ?> currentCompositeTransform;
+
+ public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
+ this.batchContext = new FlinkBatchTranslationContext(env, options);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Pipeline Visitor Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void enterCompositeTransform(TransformTreeNode node) {
+ System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null && currentCompositeTransform == null) {
+
+ BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator != null) {
+ currentCompositeTransform = transform;
+ if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
+ // we can only optimize CoGroupByKey for input size 2
+ currentCompositeTransform = null;
+ }
+ }
+ }
+ this.depth++;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null && currentCompositeTransform == transform) {
+
+ BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator != null) {
+ System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+ applyBatchTransform(transform, node, translator);
+ currentCompositeTransform = null;
+ } else {
+ throw new IllegalStateException("Attempted to translate composite transform " +
+ "but no translator was found: " + currentCompositeTransform);
+ }
+ }
+ this.depth--;
+ System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+ }
+
+ @Override
+ public void visitTransform(TransformTreeNode node) {
+ System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+ if (currentCompositeTransform != null) {
+ // ignore it
+ return;
+ }
+
+ // get the transformation corresponding to hte node we are
+ // currently visiting and translate it into its Flink alternative.
+
+ PTransform<?, ?> transform = node.getTransform();
+ BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator == null) {
+ System.out.println(node.getTransform().getClass());
+ throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+ }
+ applyBatchTransform(transform, node, translator);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ // do nothing here
+ }
+
+ private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
+ if (this.batchContext == null) {
+ throw new IllegalStateException("The FlinkPipelineTranslator is not yet initialized.");
+ }
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+
+ @SuppressWarnings("unchecked")
+ BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+
+ // create the applied PTransform on the batchContext
+ batchContext.setCurrentTransform(AppliedPTransform.of(
+ node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+ typedTranslator.translateNode(typedTransform, batchContext);
+ }
+
+ /**
+ * A translator of a {@link PTransform}.
+ */
+ public interface BatchTransformTranslator<Type extends PTransform> {
+ void translateNode(Type transform, FlinkBatchTranslationContext context);
+ }
+
+ private static String genSpaces(int n) {
+ String s = "";
+ for (int i = 0; i < n; i++) {
+ s += "| ";
+ }
+ return s;
+ }
+
+ private static String formatNodeName(TransformTreeNode node) {
+ return node.toString().split("@")[1] + node.getTransform();
+ }
+}