You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:11 UTC

[16/50] [abbrv] incubator-beam git commit: Rearranging the code and renaming certain classes.

Rearranging the code and renaming certain classes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc4c60eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc4c60eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc4c60eb

Branch: refs/heads/master
Commit: bc4c60ebfa49ad050367533809c265375e8c0b01
Parents: 37a9b29
Author: kl0u <kk...@gmail.com>
Authored: Mon Feb 29 12:38:56 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../dataflow/FlinkJobExecutionEnvironment.java  | 237 -----------------
 .../FlinkPipelineExecutionEnvironment.java      | 255 +++++++++++++++++++
 .../flink/dataflow/FlinkPipelineOptions.java    |  23 +-
 .../flink/dataflow/FlinkPipelineRunner.java     |   6 +-
 .../examples/streaming/AutoComplete.java        |   7 +-
 .../examples/streaming/JoinExamples.java        |   5 +-
 .../KafkaWindowedWordCountExample.java          |   3 +
 .../examples/streaming/WindowedWordCount.java   |   3 +
 .../FlinkStreamingTransformTranslators.java     |   2 -
 .../FlinkStreamingTranslationContext.java       |   9 +-
 .../streaming/FlinkAbstractParDoWrapper.java    |  10 +-
 .../FlinkGroupAlsoByWindowWrapper.java          |   2 -
 .../io/FlinkStreamingCreateFunction.java        |   7 +-
 .../streaming/io/UnboundedSourceWrapper.java    |  11 +-
 .../dataartisans/flink/dataflow/AvroITCase.java |   4 +-
 .../flink/dataflow/FlattenizeITCase.java        |   2 +-
 .../flink/dataflow/FlinkTestPipeline.java       |  14 +-
 .../flink/dataflow/JoinExamplesITCase.java      |   2 +-
 .../flink/dataflow/MaybeEmptyTestITCase.java    |   2 +-
 .../flink/dataflow/ParDoMultiOutputITCase.java  |   2 +-
 .../flink/dataflow/ReadSourceITCase.java        |   2 +-
 .../dataflow/RemoveDuplicatesEmptyITCase.java   |   2 +-
 .../flink/dataflow/RemoveDuplicatesITCase.java  |   2 +-
 .../flink/dataflow/SideInputITCase.java         |   2 +-
 .../flink/dataflow/TfIdfITCase.java             |   2 +-
 .../dataflow/TopWikipediaSessionsITCase.java    | 144 -----------
 .../flink/dataflow/WordCountITCase.java         |   2 +-
 .../flink/dataflow/WordCountJoin2ITCase.java    |   2 +-
 .../flink/dataflow/WordCountJoin3ITCase.java    |   2 +-
 .../flink/dataflow/WriteSinkITCase.java         |   2 +-
 .../streaming/GroupAlsoByWindowTest.java        |   8 +-
 .../streaming/TopWikipediaSessionsITCase.java   | 145 +++++++++++
 32 files changed, 481 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
deleted file mode 100644
index 91b2f64..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.dataartisans.flink.dataflow;
-
-import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
-import com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator;
-import com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class FlinkJobExecutionEnvironment {
-
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkJobExecutionEnvironment.class);
-
-	private final FlinkPipelineOptions options;
-
-	/**
-	 * The Flink Batch execution environment. This is instantiated to either a
-	 * {@link org.apache.flink.api.java.CollectionEnvironment},
-	 * a {@link org.apache.flink.api.java.LocalEnvironment} or
-	 * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
-	 * options.
-	 */
-	private ExecutionEnvironment flinkBatchEnv;
-
-
-	/**
-	 * The Flink Streaming execution environment. This is instantiated to either a
-	 * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
-	 * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
-	 * on the configuration options, and more specifically, the url of the master url.
-	 */
-	private StreamExecutionEnvironment flinkStreamEnv;
-
-	/**
-	 * Translator for this FlinkPipelineRunner. Its role is to translate the Dataflow operators to
-	 * their Flink based counterparts. Based on the options provided by the user, if we have a streaming job,
-	 * this is instantiated to a FlinkStreamingPipelineTranslator. In other case, i.e. a batch job,
-	 * a FlinkBatchPipelineTranslator is created.
-	 */
-	private FlinkPipelineTranslator flinkPipelineTranslator;
-
-	public FlinkJobExecutionEnvironment(FlinkPipelineOptions options) {
-		if (options == null) {
-			throw new IllegalArgumentException("Options in the FlinkJobExecutionEnvironment cannot be NULL.");
-		}
-		this.options = options;
-		this.createJobEnvironment();
-		this.createJobGraphTranslator();
-	}
-
-	/**
-	 * Depending on the type of job (Streaming or Batch) and the user-specified options,
-	 * this method creates the adequate ExecutionEnvironment.
-	 */
-	private void createJobEnvironment() {
-		if (options.isStreaming()) {
-			LOG.info("Creating the required STREAMING Environment.");
-			createStreamExecutionEnvironment();
-		} else {
-			LOG.info("Creating the required BATCH Environment.");
-			createBatchExecutionEnvironment();
-		}
-	}
-
-	/**
-	 * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
-	 * translator. In the case of batch, it will work with DataSets, while for streaming, it will work
-	 * with DataStreams.
-	 */
-	private void createJobGraphTranslator() {
-		checkInitializationState();
-		if (this.flinkPipelineTranslator != null) {
-			throw new IllegalStateException("JobGraphTranslator already initialized.");
-		}
-
-		this.flinkPipelineTranslator = options.isStreaming() ?
-				new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
-				new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
-	}
-
-	public void translate(Pipeline pipeline) {
-		checkInitializationState();
-		if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
-			createJobEnvironment();
-		}
-		if (this.flinkPipelineTranslator == null) {
-			createJobGraphTranslator();
-		}
-		this.flinkPipelineTranslator.translate(pipeline);
-	}
-
-	public JobExecutionResult executeJob() throws Exception {
-		if (options.isStreaming()) {
-
-			System.out.println("Plan: " + this.flinkStreamEnv.getExecutionPlan());
-
-			if (this.flinkStreamEnv == null) {
-				throw new RuntimeException("JobExecutionEnvironment not initialized.");
-			}
-			if (this.flinkPipelineTranslator == null) {
-				throw new RuntimeException("JobGraphTranslator not initialized.");
-			}
-			return this.flinkStreamEnv.execute();
-		} else {
-			if (this.flinkBatchEnv == null) {
-				throw new RuntimeException("JobExecutionEnvironment not initialized.");
-			}
-			if (this.flinkPipelineTranslator == null) {
-				throw new RuntimeException("JobGraphTranslator not initialized.");
-			}
-			return this.flinkBatchEnv.execute();
-		}
-	}
-
-	/**
-	 * If the submitted job is a batch processing job, this method creates the adequate
-	 * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
-	 * on the user-specified options.
-	 */
-	private void createBatchExecutionEnvironment() {
-		if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
-			throw new RuntimeException("JobExecutionEnvironment already initialized.");
-		}
-
-		String masterUrl = options.getFlinkMaster();
-		this.flinkStreamEnv = null;
-
-		// depending on the master, create the right environment.
-		if (masterUrl.equals("[local]")) {
-			this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
-		} else if (masterUrl.equals("[collection]")) {
-			this.flinkBatchEnv = new CollectionEnvironment();
-		} else if (masterUrl.equals("[auto]")) {
-			this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
-		} else if (masterUrl.matches(".*:\\d*")) {
-			String[] parts = masterUrl.split(":");
-			List<String> stagingFiles = options.getFilesToStage();
-			this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
-					Integer.parseInt(parts[1]),
-					stagingFiles.toArray(new String[stagingFiles.size()]));
-		} else {
-			LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
-			this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
-		}
-
-		// set the correct parallelism.
-		if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
-			this.flinkBatchEnv.setParallelism(options.getParallelism());
-		}
-
-		// set parallelism in the options (required by some execution code)
-		options.setParallelism(flinkBatchEnv.getParallelism());
-	}
-
-	/**
-	 * If the submitted job is a stream processing job, this method creates the adequate
-	 * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
-	 * on the user-specified options.
-	 */
-	private void createStreamExecutionEnvironment() {
-		if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
-			throw new RuntimeException("JobExecutionEnvironment already initialized.");
-		}
-
-		String masterUrl = options.getFlinkMaster();
-		this.flinkBatchEnv = null;
-
-		// depending on the master, create the right environment.
-		if (masterUrl.equals("[local]")) {
-			this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
-		} else if (masterUrl.equals("[auto]")) {
-			this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-		} else if (masterUrl.matches(".*:\\d*")) {
-			String[] parts = masterUrl.split(":");
-			List<String> stagingFiles = options.getFilesToStage();
-			this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
-					Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
-		} else {
-			LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
-			this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-		}
-
-		// set the correct parallelism.
-		if (options.getParallelism() != -1) {
-			this.flinkStreamEnv.setParallelism(options.getParallelism());
-		}
-
-		// set parallelism in the options (required by some execution code)
-		options.setParallelism(flinkStreamEnv.getParallelism());
-
-		// although we do not use the generated timestamps,
-		// enabling timestamps is needed for the watermarks.
-		this.flinkStreamEnv.getConfig().enableTimestamps();
-
-		this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-		this.flinkStreamEnv.enableCheckpointing(1000);
-		this.flinkStreamEnv.setNumberOfExecutionRetries(5);
-
-		LOG.info("Setting execution retry delay to 3 sec");
-		this.flinkStreamEnv.getConfig().setExecutionRetryDelay(3000);
-	}
-
-	private void checkInitializationState() {
-		if (this.options == null) {
-			throw new IllegalStateException("FlinkJobExecutionEnvironment is not initialized yet.");
-		}
-
-		if (options.isStreaming() && this.flinkBatchEnv != null) {
-			throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
-		} else if (!options.isStreaming() && this.flinkStreamEnv != null) {
-			throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
new file mode 100644
index 0000000..a1372bd
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow;
+
+import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
+import com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator;
+import com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class FlinkPipelineExecutionEnvironment {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+
+	private final FlinkPipelineOptions options;
+
+	/**
+	 * The Flink Batch execution environment. This is instantiated to either a
+	 * {@link org.apache.flink.api.java.CollectionEnvironment},
+	 * a {@link org.apache.flink.api.java.LocalEnvironment} or
+	 * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
+	 * options.
+	 */
+	private ExecutionEnvironment flinkBatchEnv;
+
+
+	/**
+	 * The Flink Streaming execution environment. This is instantiated to either a
+	 * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+	 * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+	 * on the configuration options, and more specifically, the url of the master.
+	 */
+	private StreamExecutionEnvironment flinkStreamEnv;
+
+	/**
+	 * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to
+	 * their Flink counterparts. Based on the options provided by the user, if we have a streaming job,
+	 * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job,
+	 * a {@link FlinkBatchPipelineTranslator} is created.
+	 */
+	private FlinkPipelineTranslator flinkPipelineTranslator;
+
+	/**
+	 * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
+	 * provided {@link FlinkPipelineOptions}.
+	 *
+	 * @param options the user-defined pipeline options.
+	 * */
+	public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+		this.options = Preconditions.checkNotNull(options);
+		this.createPipelineExecutionEnvironment();
+		this.createPipelineTranslator();
+	}
+
+	/**
+	 * Depending on the type of job (Streaming or Batch) and the user-specified options,
+	 * this method creates the adequate ExecutionEnvironment.
+	 */
+	private void createPipelineExecutionEnvironment() {
+		if (options.isStreaming()) {
+			createStreamExecutionEnvironment();
+		} else {
+			createBatchExecutionEnvironment();
+		}
+	}
+
+	/**
+	 * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
+	 * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet},
+	 * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}.
+	 */
+	private void createPipelineTranslator() {
+		checkInitializationState();
+		if (this.flinkPipelineTranslator != null) {
+			throw new IllegalStateException("FlinkPipelineTranslator already initialized.");
+		}
+
+		this.flinkPipelineTranslator = options.isStreaming() ?
+				new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+				new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+	}
+
+	/**
+	 * Depending on if the job is a Streaming or a Batch one, this method creates
+	 * the necessary execution environment and pipeline translator, and translates
+	 * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into
+	 * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream}
+	 * one.
+	 * */
+	public void translate(Pipeline pipeline) {
+		checkInitializationState();
+		if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+			createPipelineExecutionEnvironment();
+		}
+		if (this.flinkPipelineTranslator == null) {
+			createPipelineTranslator();
+		}
+		this.flinkPipelineTranslator.translate(pipeline);
+	}
+
+	/**
+	 * Launches the program execution.
+	 * */
+	public JobExecutionResult executePipeline() throws Exception {
+		if (options.isStreaming()) {
+			if (this.flinkStreamEnv == null) {
+				throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+			}
+			if (this.flinkPipelineTranslator == null) {
+				throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+			}
+			return this.flinkStreamEnv.execute();
+		} else {
+			if (this.flinkBatchEnv == null) {
+				throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+			}
+			if (this.flinkPipelineTranslator == null) {
+				throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+			}
+			return this.flinkBatchEnv.execute();
+		}
+	}
+
+	/**
+	 * If the submitted job is a batch processing job, this method creates the adequate
+	 * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+	 * on the user-specified options.
+	 */
+	private void createBatchExecutionEnvironment() {
+		if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+			throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+		}
+
+		LOG.info("Creating the required Batch Execution Environment.");
+
+		String masterUrl = options.getFlinkMaster();
+		this.flinkStreamEnv = null;
+
+		// depending on the master, create the right environment.
+		if (masterUrl.equals("[local]")) {
+			this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+		} else if (masterUrl.equals("[collection]")) {
+			this.flinkBatchEnv = new CollectionEnvironment();
+		} else if (masterUrl.equals("[auto]")) {
+			this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+		} else if (masterUrl.matches(".*:\\d*")) {
+			String[] parts = masterUrl.split(":");
+			List<String> stagingFiles = options.getFilesToStage();
+			this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+					Integer.parseInt(parts[1]),
+					stagingFiles.toArray(new String[stagingFiles.size()]));
+		} else {
+			LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+			this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+		}
+
+		// set the correct parallelism.
+		if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
+			this.flinkBatchEnv.setParallelism(options.getParallelism());
+		}
+
+		// set parallelism in the options (required by some execution code)
+		options.setParallelism(flinkBatchEnv.getParallelism());
+	}
+
+	/**
+	 * If the submitted job is a stream processing job, this method creates the adequate
+	 * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+	 * on the user-specified options.
+	 */
+	private void createStreamExecutionEnvironment() {
+		if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+			throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+		}
+
+		LOG.info("Creating the required Streaming Environment.");
+
+		String masterUrl = options.getFlinkMaster();
+		this.flinkBatchEnv = null;
+
+		// depending on the master, create the right environment.
+		if (masterUrl.equals("[local]")) {
+			this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+		} else if (masterUrl.equals("[auto]")) {
+			this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		} else if (masterUrl.matches(".*:\\d*")) {
+			String[] parts = masterUrl.split(":");
+			List<String> stagingFiles = options.getFilesToStage();
+			this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+					Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
+		} else {
+			LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+			this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+		}
+
+		// set the correct parallelism.
+		if (options.getParallelism() != -1) {
+			this.flinkStreamEnv.setParallelism(options.getParallelism());
+		}
+
+		// set parallelism in the options (required by some execution code)
+		options.setParallelism(flinkStreamEnv.getParallelism());
+
+		// although we do not use the generated timestamps,
+		// enabling timestamps is needed for the watermarks.
+		this.flinkStreamEnv.getConfig().enableTimestamps();
+		this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		// for the following 2 parameters, a value of -1 means that Flink will use
+		// the default values as specified in the configuration.
+		this.flinkStreamEnv.setNumberOfExecutionRetries(options.getNumberOfExecutionRetries());
+		this.flinkStreamEnv.getConfig().setExecutionRetryDelay(options.getExecutionRetryDelay());
+
+		// A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
+		// If the value is not -1, then the validity checks are applied.
+		// By default, checkpointing is disabled.
+		long checkpointInterval = options.getCheckpointingInterval();
+		if(checkpointInterval != -1) {
+			if (checkpointInterval < 1) {
+				throw new IllegalArgumentException("The checkpoint interval must be positive");
+			}
+			this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
+		}
+	}
+
+	private void checkInitializationState() {
+		if (options.isStreaming() && this.flinkBatchEnv != null) {
+			throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
+		} else if (!options.isStreaming() && this.flinkStreamEnv != null) {
+			throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
index e746f41..2429cac 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
@@ -66,11 +66,26 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
 	String getFlinkMaster();
 	void setFlinkMaster(String value);
 
-	/**
-	 * The degree of parallelism to be used when parallelizing operations onto workers.
-	 */
-	@Description("The degree of parallelism to be used when parallelizing operations onto workers.")
+	@Description("The degree of parallelism to be used when distributing operations onto workers.")
 	@Default.Integer(-1)
 	Integer getParallelism();
 	void setParallelism(Integer value);
+
+	@Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " +
+			"fault tolerance).")
+	@Default.Long(-1L)
+	Long getCheckpointingInterval();
+	void setCheckpointingInterval(Long interval);
+
+	@Description("Sets the number of times that failed tasks are re-executed. " +
+			"A value of zero effectively disables fault tolerance. A value of -1 indicates " +
+			"that the system default value (as defined in the configuration) should be used.")
+	@Default.Integer(-1)
+	Integer getNumberOfExecutionRetries();
+	void setNumberOfExecutionRetries(Integer retries);
+
+	@Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.")
+	@Default.Long(-1L)
+	Long getExecutionRetryDelay();
+	void setExecutionRetryDelay(Long delay);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index ebd2691..7ea8370 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -54,7 +54,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 	 */
 	private final FlinkPipelineOptions options;
 
-	private final FlinkJobExecutionEnvironment flinkJobEnv;
+	private final FlinkPipelineExecutionEnvironment flinkJobEnv;
 
 	/**
 	 * Construct a runner from the provided options.
@@ -103,7 +103,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 
 	private FlinkPipelineRunner(FlinkPipelineOptions options) {
 		this.options = options;
-		this.flinkJobEnv = new FlinkJobExecutionEnvironment(options);
+		this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options);
 	}
 
 	@Override
@@ -118,7 +118,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 		
 		JobExecutionResult result;
 		try {
-			result = this.flinkJobEnv.executeJob();
+			result = this.flinkJobEnv.executePipeline();
 		} catch (Exception e) {
 			LOG.error("Pipeline execution failed", e);
 			throw new RuntimeException("Pipeline execution failed", e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
index 711d9fb..493fb25 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
@@ -325,7 +325,9 @@ public class AutoComplete {
    * Takes as input a the top candidates per prefix, and emits an entity
    * suitable for writing to Datastore.
    */
-  static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> {
+  static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String>
+          implements DoFn.RequiresWindowAccess{
+
     private static final long serialVersionUID = 0;
 
     @Override
@@ -357,6 +359,9 @@ public class AutoComplete {
   public static void main(String[] args) throws IOException {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     options.setStreaming(true);
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
     options.setRunner(FlinkPipelineRunner.class);
 
     PTransform<? super PBegin, PCollection<String>> readSource =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
index 9a5db64..60f6788 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
@@ -124,9 +124,10 @@ public class JoinExamples {
 
 	public static void main(String[] args) throws Exception {
 		Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
-		// make it a streaming example.
 		options.setStreaming(true);
+		options.setCheckpointingInterval(1000L);
+		options.setNumberOfExecutionRetries(5);
+		options.setExecutionRetryDelay(3000L);
 		options.setRunner(FlinkPipelineRunner.class);
 
 		PTransform<? super PBegin, PCollection<String>> readSourceA =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
index 42d3d88..dba2721 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
@@ -104,6 +104,9 @@ public class KafkaWindowedWordCountExample {
 		KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
 		options.setJobName("KafkaExample");
 		options.setStreaming(true);
+		options.setCheckpointingInterval(1000L);
+		options.setNumberOfExecutionRetries(5);
+		options.setExecutionRetryDelay(3000L);
 		options.setRunner(FlinkPipelineRunner.class);
 
 		System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
index b539245..37dc39a 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
@@ -99,6 +99,9 @@ public class WindowedWordCount {
 		options.setStreaming(true);
 		options.setWindowSize(10L);
 		options.setSlide(5L);
+		options.setCheckpointingInterval(1000L);
+		options.setNumberOfExecutionRetries(5);
+		options.setExecutionRetryDelay(3000L);
 		options.setRunner(FlinkPipelineRunner.class);
 
 		LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
index 46d3e36..27cc923 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
@@ -69,7 +69,6 @@ public class FlinkStreamingTransformTranslators {
 
 	// here you can find all the available translators.
 	static {
-
 		TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
 		TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
 		TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
@@ -79,7 +78,6 @@ public class FlinkStreamingTransformTranslators {
 		TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
 		TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
 		TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
-
 	}
 
 	public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
index df68e50..7c4ab93 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
@@ -18,7 +18,10 @@ package com.dataartisans.flink.dataflow.translation;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.*;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.base.Preconditions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
@@ -40,8 +43,8 @@ public class FlinkStreamingTranslationContext {
 	private AppliedPTransform<?, ?, ?> currentTransform;
 
 	public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
-		this.env = env;
-		this.options = options;
+		this.env = Preconditions.checkNotNull(env);
+		this.options = Preconditions.checkNotNull(options);
 		this.dataStreams = new HashMap<>();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 71f9c7f..dfb2b7d 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -116,10 +116,10 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
 
 		@Override
 		public BoundedWindow window() {
-//			if (!(fn instanceof DoFn.RequiresWindowAccess)) {
-//				throw new UnsupportedOperationException(
-//						"window() is only available in the context of a DoFn marked as RequiresWindow.");
-//			}
+			if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+				throw new UnsupportedOperationException(
+						"window() is only available in the context of a DoFn marked as RequiresWindow.");
+			}
 
 			Collection<? extends BoundedWindow> windows = this.element.getWindows();
 			if (windows.size() != 1) {
@@ -211,7 +211,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
 					@Override
 					public Object element() {
 						throw new UnsupportedOperationException(
-								"WindowFn attempted to access input element when none was available"); // TODO: 12/16/15 aljoscha's comment in slack
+								"WindowFn attempted to access input element when none was available");
 					}
 
 					@Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 0f0a9d0..b78db65 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -238,9 +238,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
 				this.operator = StreamingGroupAlsoByWindowsDoFn.createForIterable(
 						this.windowingStrategy, inputValueCoder);
 			} else {
-
 				Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
-				//CoderRegistry dataflowRegistry = input.getPipeline().getCoderRegistry();
 
 				AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn
 						.withInputCoder(combineFn, coderRegistry, inputKvCoder);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
index b8824f5..c952d6f 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
@@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
 
 import java.io.ByteArrayInputStream;
 import java.util.List;
@@ -44,17 +45,15 @@ public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN
 	public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
 
 		@SuppressWarnings("unchecked")
-		// TODO Flink doesn't allow null values in records
 		OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE;
-
 		for (byte[] element : elements) {
 			ByteArrayInputStream bai = new ByteArrayInputStream(element);
 			OUT outValue = coder.decode(bai, Coder.Context.OUTER);
 
 			if (outValue == null) {
-				out.collect(WindowedValue.of(voidValue, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+				out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
 			} else {
-				out.collect(WindowedValue.of(outValue, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+				out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 3e248a6..cdc2e95 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -38,7 +38,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
 	private final UnboundedSource.UnboundedReader<T> reader;
 
 	private StreamingRuntimeContext runtime = null;
-	private StreamSource.ManualWatermarkContext<T> context = null;
+	private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
 
 	private volatile boolean isRunning = false;
 
@@ -51,8 +51,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
 		return this.name;
 	}
 
-	WindowedValue<T> makeWindowedValue(
-			T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+	WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
 		if (timestamp == null) {
 			timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
 		}
@@ -66,7 +65,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
 					"Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
 		}
 
-		context = (StreamSource.ManualWatermarkContext<T>) ctx;
+		context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
 		runtime = (StreamingRuntimeContext) getRuntimeContext();
 
 		this.isRunning = reader.start();
@@ -78,11 +77,9 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
 			T item = reader.getCurrent();
 			Instant timestamp = reader.getCurrentTimestamp();
 
-			long milliseconds = timestamp.getMillis();
-
 			// write it to the output collector
 			synchronized (ctx.getCheckpointLock()) {
-				ctx.collectWithTimestamp(makeWindowedValue(item, timestamp, null, PaneInfo.NO_FIRING), milliseconds);
+				context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
 			}
 
 			// try to go to the next record

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
index c6e3e99..2b1f091 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
@@ -56,14 +56,14 @@ public class AvroITCase extends JavaProgramTestBase {
 	}
 
 	private static void runProgram(String tmpPath, String resultPath) {
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		p.apply(Create.of(new User("Joe", 3, "red"), new User("Mary", 4, "blue")).withCoder(AvroCoder.of(User.class)))
 				.apply(AvroIO.Write.to(tmpPath).withSchema(User.class));
 
 		p.run();
 
-		p = FlinkTestPipeline.create();
+		p = FlinkTestPipeline.createForBatch();
 
 		p.apply(AvroIO.Read.from(tmpPath).withSchema(User.class))
 				.apply(ParDo.of(new DoFn<User, String>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
index bc24514..928388c 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
@@ -51,7 +51,7 @@ public class FlattenizeITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		PCollection<String> p1 = p.apply(Create.of(words));
 		PCollection<String> p2 = p.apply(Create.of(words2));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index 109b1ff..56af3f1 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -32,7 +32,7 @@ public class FlinkTestPipeline extends Pipeline {
 	 * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
 	 * {@link Pipeline#run} to execute the pipeline and check the tests.
 	 */
-	public static FlinkTestPipeline create() {
+	public static FlinkTestPipeline createForBatch() {
 		return create(false);
 	}
 
@@ -44,7 +44,7 @@ public class FlinkTestPipeline extends Pipeline {
 	 *
 	 * @return The Test Pipeline
 	 */
-	public static FlinkTestPipeline createStreaming() {
+	public static FlinkTestPipeline createForStreaming() {
 		return create(true);
 	}
 
@@ -54,18 +54,18 @@ public class FlinkTestPipeline extends Pipeline {
 	 * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
 	 * {@link Pipeline#run} to execute the pipeline and check the tests.
 	 *
-	 * @param streaming True for streaming mode, False for batch
-	 * @return The Test Pipeline
+	 * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
+	 * @return The Test Pipeline.
 	 */
-	public static FlinkTestPipeline create(boolean streaming) {
+	private static FlinkTestPipeline create(boolean streaming) {
 		FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
 		FlinkPipelineOptions pipelineOptions = flinkRunner.getPipelineOptions();
 		pipelineOptions.setStreaming(streaming);
 		return new FlinkTestPipeline(flinkRunner, pipelineOptions);
 	}
 
-	private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions
-			options) {
+	private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
+							PipelineOptions options) {
 		super(runner, options);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
index ed2ecf5..af0f217 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
@@ -84,7 +84,7 @@ public class JoinExamplesITCase extends JavaProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
 		PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
index 29c34d4..35f2eaf 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
@@ -47,7 +47,7 @@ public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Seriali
 	@Override
 	protected void testProgram() throws Exception {
 
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
 				.apply(ParDo.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
index dbe88d2..ccdbbf9 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
@@ -47,7 +47,7 @@ public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Seria
 
 	@Override
 	protected void testProgram() throws Exception {
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
index ba675b1..39f54e4 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
@@ -61,7 +61,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
 
 	private static void runProgram(String resultPath) {
 
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		PCollection<String> result = p
 				.apply(Read.from(new ReadSource(1, 10)))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
index ff59db7..db794f7 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
@@ -52,7 +52,7 @@ public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase {
 
 		List<String> strings = Collections.emptyList();
 
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		PCollection<String> input =
 				p.apply(Create.of(strings))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
index a8200aa..04e06b8 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
@@ -53,7 +53,7 @@ public class RemoveDuplicatesITCase extends JavaProgramTestBase {
 
 		List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3");
 
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		PCollection<String> input =
 				p.apply(Create.of(strings))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
index d932c80..ee8843c 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
@@ -36,7 +36,7 @@ public class SideInputITCase extends JavaProgramTestBase implements Serializable
 	protected void testProgram() throws Exception {
 
 
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 
 		final PCollectionView<String> sidesInput = p

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
index e801ac4..1b4afb3 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
@@ -53,7 +53,7 @@ public class TfIdfITCase extends JavaProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 
-		Pipeline pipeline = FlinkTestPipeline.create();
+		Pipeline pipeline = FlinkTestPipeline.createForBatch();
 
 		pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
deleted file mode 100644
index eb020c5..0000000
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.dataartisans.flink.dataflow;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-
-/**
- * Session window test
- */
-public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
-	protected String resultPath;
-
-	public TopWikipediaSessionsITCase(){
-	}
-
-	static final String[] EXPECTED_RESULT = new String[] {
-			"user: user1 value:3",
-			"user: user1 value:1",
-			"user: user2 value:4",
-			"user: user2 value:6",
-			"user: user3 value:7",
-			"user: user3 value:2"
-	};
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-
-		Pipeline p = FlinkTestPipeline.createStreaming();
-
-		long now = System.currentTimeMillis() + 10000;
-		System.out.println((now + 5000) / 1000);
-
-		PCollection<KV<String, Long>> output =
-			p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
-					("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now).set
-					("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
-					("contributor_username", "user1"), new TableRow().set("timestamp", now).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
-					("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
-					("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
-					("contributor_username", "user2"), new TableRow().set("timestamp", now)
-					.set("contributor_username", "user3"))))
-
-
-
-			.apply(ParDo.of(new DoFn<TableRow, String>() {
-				@Override
-				public void processElement(ProcessContext c) throws Exception {
-					TableRow row = c.element();
-					long timestamp = (Long) row.get("timestamp");
-					String userName = (String) row.get("contributor_username");
-					if (userName != null) {
-						// Sets the timestamp field to be used in windowing.
-						c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
-					}
-				}
-			}))
-
-			.apply(ParDo.named("SampleUsers").of(
-					new DoFn<String, String>() {
-						private static final long serialVersionUID = 0;
-
-						@Override
-						public void processElement(ProcessContext c) {
-							if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) {
-								c.output(c.element());
-							}
-						}
-					}))
-
-					.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
-					.apply(Count.<String>perElement());
-
-		PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
-			@Override
-			public void processElement(ProcessContext c) throws Exception {
-				KV<String, Long> el = c.element();
-				String out = "user: " + el.getKey() + " value:" + el.getValue();
-				System.out.println(out);
-				c.output(out);
-			}
-		}));
-
-		format.apply(TextIO.Write.to(resultPath));
-
-		p.run();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
index 9427ab6..5ddd379 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
@@ -58,7 +58,7 @@ public class WordCountITCase extends JavaProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
index c3eed61..ccc52c4 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
@@ -70,7 +70,7 @@ public class WordCountJoin2ITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void testProgram() throws Exception {
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		/* Create two PCollections and join them */
 		PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
index 33e67cc..e6eddc0 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
@@ -80,7 +80,7 @@ public class WordCountJoin3ITCase extends JavaProgramTestBase {
 	@Override
 	protected void testProgram() throws Exception {
 
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		/* Create two PCollections and join them */
 		PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
index 205fe9b..865fc5f 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
@@ -63,7 +63,7 @@ public class WriteSinkITCase extends JavaProgramTestBase {
 	}
 
 	private static void runProgram(String resultPath) {
-		Pipeline p = FlinkTestPipeline.create();
+		Pipeline p = FlinkTestPipeline.createForBatch();
 
 		p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
 			.apply("CustomSink", Write.to(new MyCustomSink(resultPath)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
index b667187..1f36ee7 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -86,7 +86,7 @@ public class GroupAlsoByWindowTest {
 				.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
 				.withAllowedLateness(Duration.millis(1000));
 		long initialTime = 0L;
-		Pipeline pipeline = FlinkTestPipeline.create();
+		Pipeline pipeline = FlinkTestPipeline.createForStreaming();
 
 		KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
@@ -145,7 +145,7 @@ public class GroupAlsoByWindowTest {
 		WindowingStrategy strategy = sessionWindowingStrategy;
 
 		long initialTime = 0L;
-		Pipeline pipeline = FlinkTestPipeline.create();
+		Pipeline pipeline = FlinkTestPipeline.createForStreaming();
 
 		KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
@@ -382,7 +382,7 @@ public class GroupAlsoByWindowTest {
 	}
 
 	private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception {
-		Pipeline pipeline = FlinkTestPipeline.create();
+		Pipeline pipeline = FlinkTestPipeline.createForStreaming();
 
 		KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
@@ -478,7 +478,7 @@ public class GroupAlsoByWindowTest {
 					@Override
 					public Object element() {
 						throw new UnsupportedOperationException(
-								"WindowFn attempted to access input element when none was available"); // TODO: 12/16/15 aljoscha's comment in slack
+								"WindowFn attempted to access input element when none was available");
 					}
 
 					@Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
new file mode 100644
index 0000000..1c800fa
--- /dev/null
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkTestPipeline;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+/**
+ * Session window test
+ */
+public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
+	protected String resultPath;
+
+	public TopWikipediaSessionsITCase(){
+	}
+
+	static final String[] EXPECTED_RESULT = new String[] {
+			"user: user1 value:3",
+			"user: user1 value:1",
+			"user: user2 value:4",
+			"user: user2 value:6",
+			"user: user3 value:7",
+			"user: user3 value:2"
+	};
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+
+		Pipeline p = FlinkTestPipeline.createForStreaming();
+
+		long now = System.currentTimeMillis() + 10000;
+		System.out.println((now + 5000) / 1000);
+
+		PCollection<KV<String, Long>> output =
+			p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
+					("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now).set
+					("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
+					("contributor_username", "user1"), new TableRow().set("timestamp", now).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
+					("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
+					("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
+					("contributor_username", "user2"), new TableRow().set("timestamp", now)
+					.set("contributor_username", "user3"))))
+
+
+
+			.apply(ParDo.of(new DoFn<TableRow, String>() {
+				@Override
+				public void processElement(ProcessContext c) throws Exception {
+					TableRow row = c.element();
+					long timestamp = (Long) row.get("timestamp");
+					String userName = (String) row.get("contributor_username");
+					if (userName != null) {
+						// Sets the timestamp field to be used in windowing.
+						c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+					}
+				}
+			}))
+
+			.apply(ParDo.named("SampleUsers").of(
+					new DoFn<String, String>() {
+						private static final long serialVersionUID = 0;
+
+						@Override
+						public void processElement(ProcessContext c) {
+							if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) {
+								c.output(c.element());
+							}
+						}
+					}))
+
+					.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
+					.apply(Count.<String>perElement());
+
+		PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
+			@Override
+			public void processElement(ProcessContext c) throws Exception {
+				KV<String, Long> el = c.element();
+				String out = "user: " + el.getKey() + " value:" + el.getValue();
+				System.out.println(out);
+				c.output(out);
+			}
+		}));
+
+		format.apply(TextIO.Write.to(resultPath));
+
+		p.run();
+	}
+}