You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:03 UTC

[08/50] [abbrv] incubator-beam git commit: [runner] add streaming support with checkpointing

[runner] add streaming support with checkpointing


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/edff0785
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/edff0785
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/edff0785

Branch: refs/heads/master
Commit: edff0785a82d2c6c01abeb3c64ca0d2958ccd0fd
Parents: 517c1bd
Author: Kostas Kloudas <kk...@gmail.com>
Authored: Wed Dec 9 17:30:53 2015 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 runners/flink/pom.xml                           |  28 +
 .../dataflow/FlinkJobExecutionEnvironment.java  | 238 ++++++++
 .../flink/dataflow/FlinkPipelineRunner.java     |  99 +--
 .../flink/dataflow/examples/WordCount.java      |   2 +-
 .../examples/streaming/AutoComplete.java        | 384 ++++++++++++
 .../examples/streaming/JoinExamples.java        | 157 +++++
 .../KafkaWindowedWordCountExample.java          | 138 +++++
 .../examples/streaming/WindowedWordCount.java   | 126 ++++
 .../FlinkBatchPipelineTranslator.java           | 152 +++++
 .../FlinkBatchTransformTranslators.java         | 593 ++++++++++++++++++
 .../FlinkBatchTranslationContext.java           | 129 ++++
 .../translation/FlinkPipelineTranslator.java    | 145 +----
 .../FlinkStreamingPipelineTranslator.java       | 138 +++++
 .../FlinkStreamingTransformTranslators.java     | 356 +++++++++++
 .../FlinkStreamingTranslationContext.java       |  86 +++
 .../translation/FlinkTransformTranslators.java  | 594 ------------------
 .../translation/TranslationContext.java         | 129 ----
 .../translation/types/CoderComparator.java      | 216 +++++++
 .../translation/types/CoderComperator.java      | 218 -------
 .../translation/types/CoderTypeInformation.java |   6 +-
 .../translation/types/CoderTypeSerializer.java  |   2 -
 .../translation/types/KvCoderComperator.java    |   2 +-
 .../types/VoidCoderTypeSerializer.java          |   1 -
 .../translation/wrappers/SourceInputFormat.java |   4 +-
 .../streaming/FlinkAbstractParDoWrapper.java    | 274 +++++++++
 .../FlinkGroupAlsoByWindowWrapper.java          | 601 +++++++++++++++++++
 .../streaming/FlinkGroupByKeyWrapper.java       |  56 ++
 .../streaming/FlinkParDoBoundMultiWrapper.java  |  72 +++
 .../streaming/FlinkParDoBoundWrapper.java       |  89 +++
 .../streaming/io/UnboundedFlinkSource.java      |  76 +++
 .../streaming/io/UnboundedSocketSource.java     | 228 +++++++
 .../streaming/io/UnboundedSourceWrapper.java    | 120 ++++
 .../state/AbstractFlinkTimerInternals.java      | 139 +++++
 .../streaming/state/FlinkStateInternals.java    | 533 ++++++++++++++++
 .../streaming/state/StateCheckpointReader.java  |  89 +++
 .../streaming/state/StateCheckpointUtils.java   | 152 +++++
 .../streaming/state/StateCheckpointWriter.java  | 127 ++++
 .../wrappers/streaming/state/StateType.java     |  67 +++
 .../streaming/GroupAlsoByWindowTest.java        | 507 ++++++++++++++++
 .../streaming/StateSerializationTest.java       | 257 ++++++++
 .../flink/dataflow/util/JoinExamples.java       |   4 +-
 41 files changed, 6157 insertions(+), 1177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 6102d74..14693b8 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -71,6 +71,18 @@
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
@@ -114,6 +126,22 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-all</artifactId>
+			<version>1.9.5</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
new file mode 100644
index 0000000..66d60fa
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow;
+
+import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
+import com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator;
+import com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class FlinkJobExecutionEnvironment {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkJobExecutionEnvironment.class);
+
+	private final FlinkPipelineOptions options;
+
+	/**
+	 * The Flink Batch execution environment. This is instantiated to either a
+	 * {@link org.apache.flink.api.java.CollectionEnvironment},
+	 * a {@link org.apache.flink.api.java.LocalEnvironment} or
+	 * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
+	 * options.
+	 */
+	private ExecutionEnvironment flinkBatchEnv;
+
+
+	/**
+	 * The Flink Streaming execution environment. This is instantiated to either a
+	 * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+	 * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+	 * on the configuration options, and more specifically, the url of the master url.
+	 */
+	private StreamExecutionEnvironment flinkStreamEnv;
+
+	/**
+	 * Translator for this FlinkPipelineRunner. Its role is to translate the Dataflow operators to
+	 * their Flink based counterparts. Based on the options provided by the user, if we have a streaming job,
+	 * this is instantiated to a FlinkStreamingPipelineTranslator. In other case, i.e. a batch job,
+	 * a FlinkBatchPipelineTranslator is created.
+	 */
+	private FlinkPipelineTranslator flinkPipelineTranslator;
+
+	public FlinkJobExecutionEnvironment(FlinkPipelineOptions options) {
+		if (options == null) {
+			throw new IllegalArgumentException("Options in the FlinkJobExecutionEnvironment cannot be NULL.");
+		}
+		this.options = options;
+		this.createJobEnvironment();
+		this.createJobGraphTranslator();
+	}
+
+	/**
+	 * Depending on the type of job (Streaming or Batch) and the user-specified options,
+	 * this method creates the adequate ExecutionEnvironment.
+	 */
+	private void createJobEnvironment() {
+		if (options.isStreaming()) {
+			LOG.info("Creating the required STREAMING Environment.");
+			createStreamExecutionEnvironment();
+		} else {
+			LOG.info("Creating the required BATCH Environment.");
+			createBatchExecutionEnvironment();
+		}
+	}
+
+	/**
+	 * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
+	 * translator. In the case of batch, it will work with DataSets, while for streaming, it will work
+	 * with DataStreams.
+	 */
+	private void createJobGraphTranslator() {
+		checkInitializationState();
+		if (this.flinkPipelineTranslator != null) {
+			throw new IllegalStateException("JobGraphTranslator already initialized.");
+		}
+
+		this.flinkPipelineTranslator = options.isStreaming() ?
+				new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+				new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+	}
+
+	public void translate(Pipeline pipeline) {
+		checkInitializationState();
+		if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+			createJobEnvironment();
+		}
+		if (this.flinkPipelineTranslator == null) {
+			createJobGraphTranslator();
+		}
+		this.flinkPipelineTranslator.translate(pipeline);
+	}
+
+	public JobExecutionResult executeJob() throws Exception {
+		if (options.isStreaming()) {
+
+			System.out.println("Plan: " + this.flinkStreamEnv.getExecutionPlan());
+
+			if (this.flinkStreamEnv == null) {
+				throw new RuntimeException("JobExecutionEnvironment not initialized.");
+			}
+			if (this.flinkPipelineTranslator == null) {
+				throw new RuntimeException("JobGraphTranslator not initialized.");
+			}
+			return this.flinkStreamEnv.execute();
+		} else {
+			if (this.flinkBatchEnv == null) {
+				throw new RuntimeException("JobExecutionEnvironment not initialized.");
+			}
+			if (this.flinkPipelineTranslator == null) {
+				throw new RuntimeException("JobGraphTranslator not initialized.");
+			}
+			return this.flinkBatchEnv.execute();
+		}
+	}
+
+	/**
+	 * If the submitted job is a batch processing job, this method creates the adequate
+	 * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+	 * on the user-specified options.
+	 */
+	private void createBatchExecutionEnvironment() {
+		if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+			throw new RuntimeException("JobExecutionEnvironment already initialized.");
+		}
+
+		String masterUrl = options.getFlinkMaster();
+		this.flinkStreamEnv = null;
+
+		// depending on the master, create the right environment.
+		if (masterUrl.equals("[local]")) {
+			this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+		} else if (masterUrl.equals("[collection]")) {
+			this.flinkBatchEnv = new CollectionEnvironment();
+		} else if (masterUrl.equals("[auto]")) {
+			this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+		} else if (masterUrl.matches(".*:\\d*")) {
+			String[] parts = masterUrl.split(":");
+			List<String> stagingFiles = options.getFilesToStage();
+			this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+					Integer.parseInt(parts[1]),
+					stagingFiles.toArray(new String[stagingFiles.size()]));
+		} else {
+			LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+			this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+		}
+
+		// set the correct parallelism.
+		if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
+			this.flinkBatchEnv.setParallelism(options.getParallelism());
+		}
+
+		// set parallelism in the options (required by some execution code)
+		options.setParallelism(flinkBatchEnv.getParallelism());
+	}
+
+	/**
+	 * If the submitted job is a stream processing job, this method creates the adequate
+	 * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+	 * on the user-specified options.
+	 */
+	private void createStreamExecutionEnvironment() {
+		if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+			throw new RuntimeException("JobExecutionEnvironment already initialized.");
+		}
+
+		String masterUrl = options.getFlinkMaster();
+		this.flinkBatchEnv = null;
+
+		// depending on the master, create the right environment.
+		if (masterUrl.equals("[local]")) {
+			this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+		} else if (masterUrl.equals("[auto]")) {
+			this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		} else if (masterUrl.matches(".*:\\d*")) {
+			String[] parts = masterUrl.split(":");
+			List<String> stagingFiles = options.getFilesToStage();
+			this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+					Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
+		} else {
+			LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+			this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		}
+
+		// set the correct parallelism.
+		if (options.getParallelism() != -1) {
+			this.flinkStreamEnv.setParallelism(options.getParallelism());
+		}
+
+		// set parallelism in the options (required by some execution code)
+		options.setParallelism(flinkStreamEnv.getParallelism());
+
+		// although we do not use the generated timestamps,
+		// enabling timestamps is needed for the watermarks.
+		this.flinkStreamEnv.getConfig().enableTimestamps();
+
+		this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		this.flinkStreamEnv.enableCheckpointing(1000);
+		this.flinkStreamEnv.setNumberOfExecutionRetries(5);
+
+		LOG.info("Setting execution retry delay to 3 sec");
+		this.flinkStreamEnv.getConfig().setExecutionRetryDelay(3000);
+	}
+
+	private final void checkInitializationState() {
+		if (this.options == null) {
+			throw new IllegalStateException("FlinkJobExecutionEnvironment is not initialized yet.");
+		}
+
+		if (options.isStreaming() && this.flinkBatchEnv != null) {
+			throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
+		} else if (!options.isStreaming() && this.flinkStreamEnv != null) {
+			throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index ae31f48..f57fed2 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -15,7 +15,6 @@
  */
 package com.dataartisans.flink.dataflow;
 
-import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
@@ -28,8 +27,6 @@ import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,27 +42,19 @@ import java.util.Map;
  * A {@link PipelineRunner} that executes the operations in the
  * pipeline by first translating them to a Flink Plan and then executing them either locally
  * or on a Flink cluster, depending on the configuration.
- *
+ * <p>
  * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
  */
 public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
 
-	/** Provided options. */
-	private final FlinkPipelineOptions options;
-
 	/**
-	 * The Flink execution environment. This is instantiated to either a
-	 * {@link org.apache.flink.api.java.CollectionEnvironment},
-	 * a {@link org.apache.flink.api.java.LocalEnvironment} or
-	 * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
-	 * options.
+	 * Provided options.
 	 */
-	private final ExecutionEnvironment flinkEnv;
+	private final FlinkPipelineOptions options;
 
-	/** Translator for this FlinkPipelineRunner, based on options. */
-	private final FlinkPipelineTranslator translator;
+	private final FlinkJobExecutionEnvironment flinkJobEnv;
 
 	/**
 	 * Construct a runner from the provided options.
@@ -109,90 +98,38 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 			flinkOptions.setFlinkMaster("[auto]");
 		}
 
-		if (flinkOptions.isStreaming()) {
-			throw new RuntimeException("Streaming is currently not supported.");
-		}
-
 		return new FlinkPipelineRunner(flinkOptions);
 	}
 
 	private FlinkPipelineRunner(FlinkPipelineOptions options) {
 		this.options = options;
-		this.flinkEnv = createExecutionEnvironment(options);
-
-		// set parallelism in the options (required by some execution code)
-		options.setParallelism(flinkEnv.getParallelism());
-
-		this.translator = new FlinkPipelineTranslator(flinkEnv, options);
-	}
-
-	/**
-	 * Create Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
-	 * on the options.
-	 */
-	private ExecutionEnvironment createExecutionEnvironment(FlinkPipelineOptions options) {
-		String masterUrl = options.getFlinkMaster();
-
-
-		if (masterUrl.equals("[local]")) {
-			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-			if (options.getParallelism() != -1) {
-				env.setParallelism(options.getParallelism());
-			}
-			return env;
-		} else if (masterUrl.equals("[collection]")) {
-			return new CollectionEnvironment();
-		} else if (masterUrl.equals("[auto]")) {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			if (options.getParallelism() != -1) {
-				env.setParallelism(options.getParallelism());
-			}
-			return env;
-		} else if (masterUrl.matches(".*:\\d*")) {
-			String[] parts = masterUrl.split(":");
-			List<String> stagingFiles = options.getFilesToStage();
-			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(parts[0],
-					Integer.parseInt(parts[1]),
-					stagingFiles.toArray(new String[stagingFiles.size()]));
-			if (options.getParallelism() != -1) {
-				env.setParallelism(options.getParallelism());
-			}
-			return env;
-		} else {
-			LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			if (options.getParallelism() != -1) {
-				env.setParallelism(options.getParallelism());
-			}
-			return env;
-		}
+		this.flinkJobEnv = new FlinkJobExecutionEnvironment(options);
 	}
 
 	@Override
 	public FlinkRunnerResult run(Pipeline pipeline) {
 		LOG.info("Executing pipeline using FlinkPipelineRunner.");
-		
+
 		LOG.info("Translating pipeline to Flink program.");
-		
-		translator.translate(pipeline);
-		
+
+		this.flinkJobEnv.translate(pipeline);
+
 		LOG.info("Starting execution of Flink program.");
 		
 		JobExecutionResult result;
 		try {
-			result = flinkEnv.execute();
-		}
-		catch (Exception e) {
+			result = this.flinkJobEnv.executeJob();
+		} catch (Exception e) {
 			LOG.error("Pipeline execution failed", e);
 			throw new RuntimeException("Pipeline execution failed", e);
 		}
-		
+
 		LOG.info("Execution finished in {} msecs", result.getNetRuntime());
-		
+
 		Map<String, Object> accumulators = result.getAllAccumulatorResults();
 		if (accumulators != null && !accumulators.isEmpty()) {
 			LOG.info("Final aggregator values:");
-			
+
 			for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
 				LOG.info("{} : {}", entry.getKey(), entry.getValue());
 			}
@@ -230,16 +167,18 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 	/////////////////////////////////////////////////////////////////////////////
 
 	@Override
-	public String toString() { return "DataflowPipelineRunner#" + hashCode(); }
+	public String toString() {
+		return "DataflowPipelineRunner#" + hashCode();
+	}
 
 	/**
 	 * Attempts to detect all the resources the class loader has access to. This does not recurse
 	 * to class loader parents stopping it from pulling in resources from the system class loader.
 	 *
 	 * @param classLoader The URLClassLoader to use to detect resources to stage.
-	 * @throws IllegalArgumentException  If either the class loader is not a URLClassLoader or one
-	 * of the resources the class loader exposes is not a file resource.
 	 * @return A list of absolute paths to the resources the class loader uses.
+	 * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
+	 *                                  of the resources the class loader exposes is not a file resource.
 	 */
 	protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
 		if (!(classLoader instanceof URLClassLoader)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
index 82f1e46..7857778 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java
@@ -43,7 +43,7 @@ public class WordCount {
 		String getOutput();
 		void setOutput(String value);
 	}
-	
+
 	public static void main(String[] args) {
 
 		Options options = PipelineOptionsFactory.fromArgs(args).withValidation()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
new file mode 100644
index 0000000..0245a7b
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the command:
+ * <li>
+ *     <li>
+ *     <code>nc -lk 9999</code>
+ *     </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class AutoComplete {
+
+  /**
+   * A PTransform that takes as input a list of tokens and returns
+   * the most common tokens per prefix.
+   */
+  public static class ComputeTopCompletions
+      extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> {
+    private static final long serialVersionUID = 0;
+
+    private final int candidatesPerPrefix;
+    private final boolean recursive;
+
+    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.recursive = recursive;
+    }
+
+    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) {
+      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+    }
+
+    @Override
+    public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
+      PCollection<CompletionCandidate> candidates = input
+        // First count how often each token appears.
+        .apply(new Count.PerElement<String>())
+
+        // Map the KV outputs of Count into our own CompletionCandiate class.
+        .apply(ParDo.named("CreateCompletionCandidates").of(
+            new DoFn<KV<String, Long>, CompletionCandidate>() {
+              private static final long serialVersionUID = 0;
+
+              @Override
+              public void processElement(ProcessContext c) {
+                CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
+                c.output(cand);
+              }
+            }));
+
+      // Compute the top via either a flat or recursive algorithm.
+      if (recursive) {
+        return candidates
+          .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
+          .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+      } else {
+        return candidates
+          .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
+      }
+    }
+  }
+
+  /**
+   * Lower latency, but more expensive.
+   */
+  private static class ComputeTopFlat
+      extends PTransform<PCollection<CompletionCandidate>,
+                         PCollection<KV<String, List<CompletionCandidate>>>> {
+    private static final long serialVersionUID = 0;
+
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    @Override
+    public PCollection<KV<String, List<CompletionCandidate>>> apply(
+        PCollection<CompletionCandidate> input) {
+      return input
+        // For each completion candidate, map it to all prefixes.
+        .apply(ParDo.of(new AllPrefixes(minPrefix)))
+
+        // Find and return the top candiates for each prefix.
+        .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)
+             .withHotKeyFanout(new HotKeyFanout()));
+    }
+
+    private static class HotKeyFanout implements SerializableFunction<String, Integer> {
+      private static final long serialVersionUID = 0;
+
+      @Override
+      public Integer apply(String input) {
+        return (int) Math.pow(4, 5 - input.length());
+      }
+    }
+  }
+
+  /**
+   * Cheaper but higher latency.
+   *
+   * <p> Returns two PCollections, the first is top prefixes of size greater
+   * than minPrefix, and the second is top prefixes of size exactly
+   * minPrefix.
+   */
+  private static class ComputeTopRecursive
+      extends PTransform<PCollection<CompletionCandidate>,
+                         PCollectionList<KV<String, List<CompletionCandidate>>>> {
+    private static final long serialVersionUID = 0;
+
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> {
+      private static final long serialVersionUID = 0;
+
+      @Override
+      public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) {
+        return elem.getKey().length() > minPrefix ? 0 : 1;
+      }
+    }
+
+    private static class FlattenTops
+        extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+      private static final long serialVersionUID = 0;
+
+      @Override
+      public void processElement(ProcessContext c) {
+        for (CompletionCandidate cc : c.element().getValue()) {
+          c.output(cc);
+        }
+      }
+    }
+
+    @Override
+    public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
+          PCollection<CompletionCandidate> input) {
+        if (minPrefix > 10) {
+          // Base case, partitioning to return the output in the expected format.
+          return input
+            .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
+            .apply(Partition.of(2, new KeySizePartitionFn()));
+        } else {
+          // If a candidate is in the top N for prefix a...b, it must also be in the top
+          // N for a...bX for every X, which is typlically a much smaller set to consider.
+          // First, compute the top candidate for prefixes of size at least minPrefix + 1.
+          PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
+            .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1));
+          // Consider the top candidates for each prefix of length minPrefix + 1...
+          PCollection<KV<String, List<CompletionCandidate>>> small =
+            PCollectionList
+            .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
+            // ...together with those (previously excluded) candidates of length
+            // exactly minPrefix...
+            .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() {
+                    private static final long serialVersionUID = 0;
+
+                    @Override
+                    public Boolean apply(CompletionCandidate c) {
+                      return c.getValue().length() == minPrefix;
+                    }
+                  })))
+            .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
+            // ...set the key to be the minPrefix-length prefix...
+            .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
+            // ...and (re)apply the Top operator to all of them together.
+            .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix));
+
+          PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger
+              .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
+
+          return PCollectionList.of(flattenLarger).and(small);
+        }
+    }
+  }
+
+  /**
+   * A DoFn that keys each candidate by all its prefixes.
+   */
+  private static class AllPrefixes
+      extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+    private static final long serialVersionUID = 0;
+
+    private final int minPrefix;
+    private final int maxPrefix;
+    public AllPrefixes(int minPrefix) {
+      this(minPrefix, Integer.MAX_VALUE);
+    }
+    public AllPrefixes(int minPrefix, int maxPrefix) {
+      this.minPrefix = minPrefix;
+      this.maxPrefix = maxPrefix;
+    }
+    @Override
+      public void processElement(ProcessContext c) {
+      String word = c.element().value;
+      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+        KV kv = KV.of(word.substring(0, i), c.element());
+        c.output(kv);
+      }
+    }
+  }
+
+  /**
+   * Class used to store tag-count pairs.
+   */
+  @DefaultCoder(AvroCoder.class)
+  static class CompletionCandidate implements Comparable<CompletionCandidate> {
+    private long count;
+    private String value;
+
+    public CompletionCandidate(String value, long count) {
+      this.value = value;
+      this.count = count;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    // Empty constructor required for Avro decoding.
+    @SuppressWarnings("unused")
+    public CompletionCandidate() {}
+
+    @Override
+    public int compareTo(CompletionCandidate o) {
+      if (this.count < o.count) {
+        return -1;
+      } else if (this.count == o.count) {
+        return this.value.compareTo(o.value);
+      } else {
+        return 1;
+      }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof CompletionCandidate) {
+        CompletionCandidate that = (CompletionCandidate) other;
+        return this.count == that.count && this.value.equals(that.value);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.valueOf(count).hashCode() ^ value.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "CompletionCandidate[" + value + ", " + count + "]";
+    }
+  }
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+            createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  /**
+   * Takes as input a the top candidates per prefix, and emits an entity
+   * suitable for writing to Datastore.
+   */
+  static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> {
+    private static final long serialVersionUID = 0;
+
+    @Override
+    public void processElement(ProcessContext c) {
+      StringBuilder str = new StringBuilder();
+      KV<String, List<CompletionCandidate>> elem = c.element();
+
+      str.append(elem.getKey() +" @ "+ c.window() +" -> ");
+      for(CompletionCandidate cand: elem.getValue()) {
+        str.append(cand.toString() + " ");
+      }
+      System.out.println(str.toString());
+      c.output(str.toString());
+    }
+  }
+
+  /**
+   * Options supported by this class.
+   *
+   * <p> Inherits standard Dataflow configuration options.
+   */
+  private static interface Options extends WindowedWordCount.StreamingWordCountOptions {
+    @Description("Whether to use the recursive algorithm")
+    @Default.Boolean(true)
+    Boolean getRecursive();
+    void setRecursive(Boolean value);
+  }
+
+  public static void main(String[] args) throws IOException {
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    options.setStreaming(true);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    PTransform<? super PBegin, PCollection<String>> readSource =
+            Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream");
+    WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+    // Create the pipeline.
+    Pipeline p = Pipeline.create(options);
+    PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
+      .apply(readSource)
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .apply(Window.<String>into(windowFn)
+              .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+            .discardingFiredPanes())
+      .apply(ComputeTopCompletions.top(10, options.getRecursive()));
+
+    toWrite
+      .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile()))
+      .apply(TextIO.Write.to("./outputAutoComplete.txt"));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
new file mode 100644
index 0000000..b0cc4fa
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.joda.time.Duration;
+
+/**
+ * To run the example, first open two sockets on two terminals by executing the commands:
+ * <li>
+ *     <li>
+ *         <code>nc -lk 9999</code>, and
+ *     </li>
+ *     <li>
+ *         <code>nc -lk 9998</code>
+ *     </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class JoinExamples {
+
+	static PCollection<String> joinEvents(PCollection<String> streamA,
+										  PCollection<String> streamB) throws Exception {
+
+		final TupleTag<String> firstInfoTag = new TupleTag<String>();
+		final TupleTag<String> secondInfoTag = new TupleTag<String>();
+
+		// transform both input collections to tuple collections, where the keys are country
+		// codes in both cases.
+		PCollection<KV<String, String>> firstInfo = streamA.apply(
+				ParDo.of(new ExtractEventDataFn()));
+		PCollection<KV<String, String>> secondInfo = streamB.apply(
+				ParDo.of(new ExtractEventDataFn()));
+
+		// country code 'key' -> CGBKR (<event info>, <country name>)
+		PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+				.of(firstInfoTag, firstInfo)
+				.and(secondInfoTag, secondInfo)
+				.apply(CoGroupByKey.<String>create());
+
+		// Process the CoGbkResult elements generated by the CoGroupByKey transform.
+		// country code 'key' -> string of <event info>, <country name>
+		PCollection<KV<String, String>> finalResultCollection =
+				kvpCollection.apply(ParDo.named("Process").of(
+						new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+							private static final long serialVersionUID = 0;
+
+							@Override
+							public void processElement(ProcessContext c) {
+								KV<String, CoGbkResult> e = c.element();
+								String key = e.getKey();
+
+								String defaultA = "NO_VALUE";
+
+								// the following getOnly is a bit tricky because it expects to have
+								// EXACTLY ONE value in the corresponding stream and for the corresponding key.
+
+								String lineA = e.getValue().getOnly(firstInfoTag, defaultA);
+								for (String lineB : c.element().getValue().getAll(secondInfoTag)) {
+									// Generate a string that combines information from both collection values
+									c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB));
+								}
+							}
+						}));
+
+		return finalResultCollection
+				.apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
+					private static final long serialVersionUID = 0;
+
+					@Override
+					public void processElement(ProcessContext c) {
+						String result = c.element().getKey() + " -> " + c.element().getValue();
+						System.out.println(result);
+						c.output(result);
+					}
+				}));
+	}
+
+	static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
+		private static final long serialVersionUID = 0;
+
+		@Override
+		public void processElement(ProcessContext c) {
+			String line = c.element().toLowerCase();
+			String key = line.split("\\s")[0];
+			c.output(KV.of(key, line));
+		}
+	}
+
+	private static interface Options extends WindowedWordCount.StreamingWordCountOptions {
+
+	}
+
+	public static void main(String[] args) throws Exception {
+		Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+		// make it a streaming example.
+		options.setStreaming(true);
+		options.setRunner(FlinkPipelineRunner.class);
+
+		PTransform<? super PBegin, PCollection<String>> readSourceA =
+				Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
+		PTransform<? super PBegin, PCollection<String>> readSourceB =
+				Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
+
+		WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+		Pipeline p = Pipeline.create(options);
+
+		// the following two 'applys' create multiple inputs to our pipeline, one for each
+		// of our two input sources.
+		PCollection<String> streamA = p.apply(readSourceA)
+				.apply(Window.<String>into(windowFn)
+						.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+						.discardingFiredPanes());
+		PCollection<String> streamB = p.apply(readSourceB)
+				.apply(Window.<String>into(windowFn)
+						.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+						.discardingFiredPanes());
+
+		PCollection<String> formattedResults = joinEvents(streamA, streamB);
+		formattedResults.apply(TextIO.Write.to("./outputJoin.txt"));
+		p.run();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
new file mode 100644
index 0000000..46c9bd6
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedFlinkSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.joda.time.Duration;
+
+import java.util.Properties;
+
+public class KafkaWindowedWordCountExample {
+
+	static final String KAFKA_TOPIC = "test";  // Default kafka topic to read from
+	static final String KAFKA_BROKER = "localhost:9092";  // Default kafka broker to contact
+	static final String GROUP_ID = "myGroup";  // Default groupId
+	static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to connect to for Kafka
+
+	public static class ExtractWordsFn extends DoFn<String, String> {
+		private final Aggregator<Long, Long> emptyLines =
+				createAggregator("emptyLines", new Sum.SumLongFn());
+
+		@Override
+		public void processElement(ProcessContext c) {
+			if (c.element().trim().isEmpty()) {
+				emptyLines.addValue(1L);
+			}
+
+			// Split the line into words.
+			String[] words = c.element().split("[^a-zA-Z']+");
+
+			// Output each word encountered into the output PCollection.
+			for (String word : words) {
+				if (!word.isEmpty()) {
+					c.output(word);
+				}
+			}
+		}
+	}
+
+	public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+		@Override
+		public void processElement(ProcessContext c) {
+			String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+			System.out.println(row);
+			c.output(row);
+		}
+	}
+
+	public static interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions {
+		@Description("The Kafka topic to read from")
+		@Default.String(KAFKA_TOPIC)
+		String getKafkaTopic();
+
+		void setKafkaTopic(String value);
+
+		@Description("The Kafka Broker to read from")
+		@Default.String(KAFKA_BROKER)
+		String getBroker();
+
+		void setBroker(String value);
+
+		@Description("The Zookeeper server to connect to")
+		@Default.String(ZOOKEEPER)
+		String getZookeeper();
+
+		void setZookeeper(String value);
+
+		@Description("The groupId")
+		@Default.String(GROUP_ID)
+		String getGroup();
+
+		void setGroup(String value);
+
+	}
+
+	public static void main(String[] args) {
+		PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
+		KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
+		options.setJobName("KafkaExample");
+		options.setStreaming(true);
+		options.setRunner(FlinkPipelineRunner.class);
+
+		System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
+		Pipeline pipeline = Pipeline.create(options);
+
+		Properties p = new Properties();
+		p.setProperty("zookeeper.connect", options.getZookeeper());
+		p.setProperty("bootstrap.servers", options.getBroker());
+		p.setProperty("group.id", options.getGroup());
+
+		// this is the Flink consumer that reads the input to
+		// the program from a kafka topic.
+		FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>(
+				options.getKafkaTopic(),
+				new SimpleStringSchema(), p);
+
+		PCollection<String> words = pipeline
+				.apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount"))
+				.apply(ParDo.of(new ExtractWordsFn()))
+				.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
+						.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+						.discardingFiredPanes());
+
+		PCollection<KV<String, Long>> wordCounts =
+				words.apply(Count.<String>perElement());
+
+		wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+				.apply(TextIO.Write.to("./outputKafka.txt"));
+
+		pipeline.run();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
new file mode 100644
index 0000000..1d4a44b
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.examples.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSocketSource;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.*;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * To run the example, first open a socket on a terminal by executing the command:
+ * <li>
+ *     <li>
+ *     <code>nc -lk 9999</code>
+ *     </li>
+ * </li>
+ * and then launch the example. Now whatever you type in the terminal is going to be
+ * the input to the program.
+ * */
+public class WindowedWordCount {
+
+	private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
+
+	static final long WINDOW_SIZE = 10;  // Default window duration in seconds
+	static final long SLIDE_SIZE = 5;  // Default window slide in seconds
+
+	static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+		@Override
+		public void processElement(ProcessContext c) {
+			String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+			c.output(row);
+		}
+	}
+
+	static class ExtractWordsFn extends DoFn<String, String> {
+		private final Aggregator<Long, Long> emptyLines =
+				createAggregator("emptyLines", new Sum.SumLongFn());
+
+		@Override
+		public void processElement(ProcessContext c) {
+			if (c.element().trim().isEmpty()) {
+				emptyLines.addValue(1L);
+			}
+
+			// Split the line into words.
+			String[] words = c.element().split("[^a-zA-Z']+");
+
+			// Output each word encountered into the output PCollection.
+			for (String word : words) {
+				if (!word.isEmpty()) {
+					c.output(word);
+				}
+			}
+		}
+	}
+
+	public static interface StreamingWordCountOptions extends com.dataartisans.flink.dataflow.examples.WordCount.Options {
+		@Description("Sliding window duration, in seconds")
+		@Default.Long(WINDOW_SIZE)
+		Long getWindowSize();
+
+		void setWindowSize(Long value);
+
+		@Description("Window slide, in seconds")
+		@Default.Long(SLIDE_SIZE)
+		Long getSlide();
+
+		void setSlide(Long value);
+	}
+
+	public static void main(String[] args) throws IOException {
+		StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
+		options.setStreaming(true);
+		options.setWindowSize(10L);
+		options.setSlide(5L);
+		options.setRunner(FlinkPipelineRunner.class);
+
+		LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +
+				" sec. and a slide of " + options.getSlide());
+
+		Pipeline pipeline = Pipeline.create(options);
+
+		PCollection<String> words = pipeline
+				.apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount"))
+				.apply(ParDo.of(new ExtractWordsFn()))
+				.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
+						.every(Duration.standardSeconds(options.getSlide())))
+						.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
+						.discardingFiredPanes());
+
+		PCollection<KV<String, Long>> wordCounts =
+				words.apply(Count.<String>perElement());
+
+		wordCounts.apply(ParDo.of(new FormatAsStringFn()))
+				.apply(TextIO.Write.to("./outputWordCount.txt"));
+
+		pipeline.run();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
new file mode 100644
index 0000000..8c0183e
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
+ * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
+ */
+public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+
+	/**
+	 * The necessary context in the case of a batch job.
+	 */
+	private final FlinkBatchTranslationContext batchContext;
+
+	private int depth = 0;
+
+	/**
+	 * Composite transform that we want to translate before proceeding with other transforms.
+	 */
+	private PTransform<?, ?> currentCompositeTransform;
+
+	public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
+		this.batchContext = new FlinkBatchTranslationContext(env, options);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Pipeline Visitor Methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void enterCompositeTransform(TransformTreeNode node) {
+		System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+
+		PTransform<?, ?> transform = node.getTransform();
+		if (transform != null && currentCompositeTransform == null) {
+
+			BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+			if (translator != null) {
+				currentCompositeTransform = transform;
+				if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
+					// we can only optimize CoGroupByKey for input size 2
+					currentCompositeTransform = null;
+				}
+			}
+		}
+		this.depth++;
+	}
+
+	@Override
+	public void leaveCompositeTransform(TransformTreeNode node) {
+		PTransform<?, ?> transform = node.getTransform();
+		if (transform != null && currentCompositeTransform == transform) {
+
+			BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+			if (translator != null) {
+				System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+				applyBatchTransform(transform, node, translator);
+				currentCompositeTransform = null;
+			} else {
+				throw new IllegalStateException("Attempted to translate composite transform " +
+						"but no translator was found: " + currentCompositeTransform);
+			}
+		}
+		this.depth--;
+		System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+	}
+
+	@Override
+	public void visitTransform(TransformTreeNode node) {
+		System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+		if (currentCompositeTransform != null) {
+			// ignore it
+			return;
+		}
+
+		// get the transformation corresponding to hte node we are
+		// currently visiting and translate it into its Flink alternative.
+
+		PTransform<?, ?> transform = node.getTransform();
+		BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+		if (translator == null) {
+			System.out.println(node.getTransform().getClass());
+			throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+		}
+		applyBatchTransform(transform, node, translator);
+	}
+
+	@Override
+	public void visitValue(PValue value, TransformTreeNode producer) {
+		// do nothing here
+	}
+
+	private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
+		if (this.batchContext == null) {
+			throw new IllegalStateException("The FlinkPipelineTranslator is not yet initialized.");
+		}
+
+		@SuppressWarnings("unchecked")
+		T typedTransform = (T) transform;
+
+		@SuppressWarnings("unchecked")
+		BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+
+		// create the applied PTransform on the batchContext
+		batchContext.setCurrentTransform(AppliedPTransform.of(
+				node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+		typedTranslator.translateNode(typedTransform, batchContext);
+	}
+
+	/**
+	 * A translator of a {@link PTransform}.
+	 */
+	public interface BatchTransformTranslator<Type extends PTransform> {
+		void translateNode(Type transform, FlinkBatchTranslationContext context);
+	}
+
+	private static String genSpaces(int n) {
+		String s = "";
+		for (int i = 0; i < n; i++) {
+			s += "|   ";
+		}
+		return s;
+	}
+
+	private static String formatNodeName(TransformTreeNode node) {
+		return node.toString().split("@")[1] + node.getTransform();
+	}
+}