You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:11 UTC
[16/50] [abbrv] incubator-beam git commit: Rearranging the code and
renaming certain classes.
Rearranging the code and renaming certain classes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc4c60eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc4c60eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc4c60eb
Branch: refs/heads/master
Commit: bc4c60ebfa49ad050367533809c265375e8c0b01
Parents: 37a9b29
Author: kl0u <kk...@gmail.com>
Authored: Mon Feb 29 12:38:56 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800
----------------------------------------------------------------------
.../dataflow/FlinkJobExecutionEnvironment.java | 237 -----------------
.../FlinkPipelineExecutionEnvironment.java | 255 +++++++++++++++++++
.../flink/dataflow/FlinkPipelineOptions.java | 23 +-
.../flink/dataflow/FlinkPipelineRunner.java | 6 +-
.../examples/streaming/AutoComplete.java | 7 +-
.../examples/streaming/JoinExamples.java | 5 +-
.../KafkaWindowedWordCountExample.java | 3 +
.../examples/streaming/WindowedWordCount.java | 3 +
.../FlinkStreamingTransformTranslators.java | 2 -
.../FlinkStreamingTranslationContext.java | 9 +-
.../streaming/FlinkAbstractParDoWrapper.java | 10 +-
.../FlinkGroupAlsoByWindowWrapper.java | 2 -
.../io/FlinkStreamingCreateFunction.java | 7 +-
.../streaming/io/UnboundedSourceWrapper.java | 11 +-
.../dataartisans/flink/dataflow/AvroITCase.java | 4 +-
.../flink/dataflow/FlattenizeITCase.java | 2 +-
.../flink/dataflow/FlinkTestPipeline.java | 14 +-
.../flink/dataflow/JoinExamplesITCase.java | 2 +-
.../flink/dataflow/MaybeEmptyTestITCase.java | 2 +-
.../flink/dataflow/ParDoMultiOutputITCase.java | 2 +-
.../flink/dataflow/ReadSourceITCase.java | 2 +-
.../dataflow/RemoveDuplicatesEmptyITCase.java | 2 +-
.../flink/dataflow/RemoveDuplicatesITCase.java | 2 +-
.../flink/dataflow/SideInputITCase.java | 2 +-
.../flink/dataflow/TfIdfITCase.java | 2 +-
.../dataflow/TopWikipediaSessionsITCase.java | 144 -----------
.../flink/dataflow/WordCountITCase.java | 2 +-
.../flink/dataflow/WordCountJoin2ITCase.java | 2 +-
.../flink/dataflow/WordCountJoin3ITCase.java | 2 +-
.../flink/dataflow/WriteSinkITCase.java | 2 +-
.../streaming/GroupAlsoByWindowTest.java | 8 +-
.../streaming/TopWikipediaSessionsITCase.java | 145 +++++++++++
32 files changed, 481 insertions(+), 440 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
deleted file mode 100644
index 91b2f64..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.dataartisans.flink.dataflow;
-
-import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
-import com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator;
-import com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class FlinkJobExecutionEnvironment {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkJobExecutionEnvironment.class);
-
- private final FlinkPipelineOptions options;
-
- /**
- * The Flink Batch execution environment. This is instantiated to either a
- * {@link org.apache.flink.api.java.CollectionEnvironment},
- * a {@link org.apache.flink.api.java.LocalEnvironment} or
- * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
- * options.
- */
- private ExecutionEnvironment flinkBatchEnv;
-
-
- /**
- * The Flink Streaming execution environment. This is instantiated to either a
- * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
- * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
- * on the configuration options, and more specifically, the url of the master url.
- */
- private StreamExecutionEnvironment flinkStreamEnv;
-
- /**
- * Translator for this FlinkPipelineRunner. Its role is to translate the Dataflow operators to
- * their Flink based counterparts. Based on the options provided by the user, if we have a streaming job,
- * this is instantiated to a FlinkStreamingPipelineTranslator. In other case, i.e. a batch job,
- * a FlinkBatchPipelineTranslator is created.
- */
- private FlinkPipelineTranslator flinkPipelineTranslator;
-
- public FlinkJobExecutionEnvironment(FlinkPipelineOptions options) {
- if (options == null) {
- throw new IllegalArgumentException("Options in the FlinkJobExecutionEnvironment cannot be NULL.");
- }
- this.options = options;
- this.createJobEnvironment();
- this.createJobGraphTranslator();
- }
-
- /**
- * Depending on the type of job (Streaming or Batch) and the user-specified options,
- * this method creates the adequate ExecutionEnvironment.
- */
- private void createJobEnvironment() {
- if (options.isStreaming()) {
- LOG.info("Creating the required STREAMING Environment.");
- createStreamExecutionEnvironment();
- } else {
- LOG.info("Creating the required BATCH Environment.");
- createBatchExecutionEnvironment();
- }
- }
-
- /**
- * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
- * translator. In the case of batch, it will work with DataSets, while for streaming, it will work
- * with DataStreams.
- */
- private void createJobGraphTranslator() {
- checkInitializationState();
- if (this.flinkPipelineTranslator != null) {
- throw new IllegalStateException("JobGraphTranslator already initialized.");
- }
-
- this.flinkPipelineTranslator = options.isStreaming() ?
- new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
- new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
- }
-
- public void translate(Pipeline pipeline) {
- checkInitializationState();
- if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
- createJobEnvironment();
- }
- if (this.flinkPipelineTranslator == null) {
- createJobGraphTranslator();
- }
- this.flinkPipelineTranslator.translate(pipeline);
- }
-
- public JobExecutionResult executeJob() throws Exception {
- if (options.isStreaming()) {
-
- System.out.println("Plan: " + this.flinkStreamEnv.getExecutionPlan());
-
- if (this.flinkStreamEnv == null) {
- throw new RuntimeException("JobExecutionEnvironment not initialized.");
- }
- if (this.flinkPipelineTranslator == null) {
- throw new RuntimeException("JobGraphTranslator not initialized.");
- }
- return this.flinkStreamEnv.execute();
- } else {
- if (this.flinkBatchEnv == null) {
- throw new RuntimeException("JobExecutionEnvironment not initialized.");
- }
- if (this.flinkPipelineTranslator == null) {
- throw new RuntimeException("JobGraphTranslator not initialized.");
- }
- return this.flinkBatchEnv.execute();
- }
- }
-
- /**
- * If the submitted job is a batch processing job, this method creates the adequate
- * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
- * on the user-specified options.
- */
- private void createBatchExecutionEnvironment() {
- if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
- throw new RuntimeException("JobExecutionEnvironment already initialized.");
- }
-
- String masterUrl = options.getFlinkMaster();
- this.flinkStreamEnv = null;
-
- // depending on the master, create the right environment.
- if (masterUrl.equals("[local]")) {
- this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
- } else if (masterUrl.equals("[collection]")) {
- this.flinkBatchEnv = new CollectionEnvironment();
- } else if (masterUrl.equals("[auto]")) {
- this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- String[] parts = masterUrl.split(":");
- List<String> stagingFiles = options.getFilesToStage();
- this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
- Integer.parseInt(parts[1]),
- stagingFiles.toArray(new String[stagingFiles.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
- this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
- }
-
- // set the correct parallelism.
- if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
- this.flinkBatchEnv.setParallelism(options.getParallelism());
- }
-
- // set parallelism in the options (required by some execution code)
- options.setParallelism(flinkBatchEnv.getParallelism());
- }
-
- /**
- * If the submitted job is a stream processing job, this method creates the adequate
- * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
- * on the user-specified options.
- */
- private void createStreamExecutionEnvironment() {
- if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
- throw new RuntimeException("JobExecutionEnvironment already initialized.");
- }
-
- String masterUrl = options.getFlinkMaster();
- this.flinkBatchEnv = null;
-
- // depending on the master, create the right environment.
- if (masterUrl.equals("[local]")) {
- this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
- } else if (masterUrl.equals("[auto]")) {
- this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- } else if (masterUrl.matches(".*:\\d*")) {
- String[] parts = masterUrl.split(":");
- List<String> stagingFiles = options.getFilesToStage();
- this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
- Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
- } else {
- LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
- this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- }
-
- // set the correct parallelism.
- if (options.getParallelism() != -1) {
- this.flinkStreamEnv.setParallelism(options.getParallelism());
- }
-
- // set parallelism in the options (required by some execution code)
- options.setParallelism(flinkStreamEnv.getParallelism());
-
- // although we do not use the generated timestamps,
- // enabling timestamps is needed for the watermarks.
- this.flinkStreamEnv.getConfig().enableTimestamps();
-
- this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- this.flinkStreamEnv.enableCheckpointing(1000);
- this.flinkStreamEnv.setNumberOfExecutionRetries(5);
-
- LOG.info("Setting execution retry delay to 3 sec");
- this.flinkStreamEnv.getConfig().setExecutionRetryDelay(3000);
- }
-
- private void checkInitializationState() {
- if (this.options == null) {
- throw new IllegalStateException("FlinkJobExecutionEnvironment is not initialized yet.");
- }
-
- if (options.isStreaming() && this.flinkBatchEnv != null) {
- throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
- } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
- throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
new file mode 100644
index 0000000..a1372bd
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow;
+
+import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
+import com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator;
+import com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class FlinkPipelineExecutionEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+
+ private final FlinkPipelineOptions options;
+
+ /**
+ * The Flink Batch execution environment. This is instantiated to either a
+ * {@link org.apache.flink.api.java.CollectionEnvironment},
+ * a {@link org.apache.flink.api.java.LocalEnvironment} or
+ * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
+ * options.
+ */
+ private ExecutionEnvironment flinkBatchEnv;
+
+
+ /**
+ * The Flink Streaming execution environment. This is instantiated to either a
+ * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+ * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+ * on the configuration options, and more specifically, the url of the master.
+ */
+ private StreamExecutionEnvironment flinkStreamEnv;
+
+ /**
+ * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to
+ * their Flink counterparts. Based on the options provided by the user, if we have a streaming job,
+ * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job,
+ * a {@link FlinkBatchPipelineTranslator} is created.
+ */
+ private FlinkPipelineTranslator flinkPipelineTranslator;
+
+ /**
+ * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
+ * provided {@link FlinkPipelineOptions}.
+ *
+ * @param options the user-defined pipeline options.
+ * */
+ public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+ this.options = Preconditions.checkNotNull(options);
+ this.createPipelineExecutionEnvironment();
+ this.createPipelineTranslator();
+ }
+
+ /**
+ * Depending on the type of job (Streaming or Batch) and the user-specified options,
+ * this method creates the adequate ExecutionEnvironment.
+ */
+ private void createPipelineExecutionEnvironment() {
+ if (options.isStreaming()) {
+ createStreamExecutionEnvironment();
+ } else {
+ createBatchExecutionEnvironment();
+ }
+ }
+
+ /**
+ * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
+ * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet},
+ * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}.
+ */
+ private void createPipelineTranslator() {
+ checkInitializationState();
+ if (this.flinkPipelineTranslator != null) {
+ throw new IllegalStateException("FlinkPipelineTranslator already initialized.");
+ }
+
+ this.flinkPipelineTranslator = options.isStreaming() ?
+ new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+ new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+ }
+
+ /**
+ * Depending on if the job is a Streaming or a Batch one, this method creates
+ * the necessary execution environment and pipeline translator, and translates
+ * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into
+ * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream}
+ * one.
+ * */
+ public void translate(Pipeline pipeline) {
+ checkInitializationState();
+ if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+ createPipelineExecutionEnvironment();
+ }
+ if (this.flinkPipelineTranslator == null) {
+ createPipelineTranslator();
+ }
+ this.flinkPipelineTranslator.translate(pipeline);
+ }
+
+ /**
+ * Launches the program execution.
+ * */
+ public JobExecutionResult executePipeline() throws Exception {
+ if (options.isStreaming()) {
+ if (this.flinkStreamEnv == null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+ }
+ if (this.flinkPipelineTranslator == null) {
+ throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+ }
+ return this.flinkStreamEnv.execute();
+ } else {
+ if (this.flinkBatchEnv == null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+ }
+ if (this.flinkPipelineTranslator == null) {
+ throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+ }
+ return this.flinkBatchEnv.execute();
+ }
+ }
+
+ /**
+ * If the submitted job is a batch processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private void createBatchExecutionEnvironment() {
+ if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+ }
+
+ LOG.info("Creating the required Batch Execution Environment.");
+
+ String masterUrl = options.getFlinkMaster();
+ this.flinkStreamEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[collection]")) {
+ this.flinkBatchEnv = new CollectionEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]),
+ stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
+ this.flinkBatchEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkBatchEnv.getParallelism());
+ }
+
+ /**
+ * If the submitted job is a stream processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private void createStreamExecutionEnvironment() {
+ if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+ }
+
+ LOG.info("Creating the required Streaming Environment.");
+
+ String masterUrl = options.getFlinkMaster();
+ this.flinkBatchEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1) {
+ this.flinkStreamEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkStreamEnv.getParallelism());
+
+ // although we do not use the generated timestamps,
+ // enabling timestamps is needed for the watermarks.
+ this.flinkStreamEnv.getConfig().enableTimestamps();
+ this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ // for the following 2 parameters, a value of -1 means that Flink will use
+ // the default values as specified in the configuration.
+ this.flinkStreamEnv.setNumberOfExecutionRetries(options.getNumberOfExecutionRetries());
+ this.flinkStreamEnv.getConfig().setExecutionRetryDelay(options.getExecutionRetryDelay());
+
+ // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
+ // If the value is not -1, then the validity checks are applied.
+ // By default, checkpointing is disabled.
+ long checkpointInterval = options.getCheckpointingInterval();
+ if(checkpointInterval != -1) {
+ if (checkpointInterval < 1) {
+ throw new IllegalArgumentException("The checkpoint interval must be positive");
+ }
+ this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
+ }
+ }
+
+ private void checkInitializationState() {
+ if (options.isStreaming() && this.flinkBatchEnv != null) {
+ throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
+ } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
+ throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
index e746f41..2429cac 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
@@ -66,11 +66,26 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
String getFlinkMaster();
void setFlinkMaster(String value);
- /**
- * The degree of parallelism to be used when parallelizing operations onto workers.
- */
- @Description("The degree of parallelism to be used when parallelizing operations onto workers.")
+ @Description("The degree of parallelism to be used when distributing operations onto workers.")
@Default.Integer(-1)
Integer getParallelism();
void setParallelism(Integer value);
+
+ @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " +
+ "fault tolerance).")
+ @Default.Long(-1L)
+ Long getCheckpointingInterval();
+ void setCheckpointingInterval(Long interval);
+
+ @Description("Sets the number of times that failed tasks are re-executed. " +
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates " +
+ "that the system default value (as defined in the configuration) should be used.")
+ @Default.Integer(-1)
+ Integer getNumberOfExecutionRetries();
+ void setNumberOfExecutionRetries(Integer retries);
+
+ @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.")
+ @Default.Long(-1L)
+ Long getExecutionRetryDelay();
+ void setExecutionRetryDelay(Long delay);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index ebd2691..7ea8370 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -54,7 +54,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
*/
private final FlinkPipelineOptions options;
- private final FlinkJobExecutionEnvironment flinkJobEnv;
+ private final FlinkPipelineExecutionEnvironment flinkJobEnv;
/**
* Construct a runner from the provided options.
@@ -103,7 +103,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
private FlinkPipelineRunner(FlinkPipelineOptions options) {
this.options = options;
- this.flinkJobEnv = new FlinkJobExecutionEnvironment(options);
+ this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options);
}
@Override
@@ -118,7 +118,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
JobExecutionResult result;
try {
- result = this.flinkJobEnv.executeJob();
+ result = this.flinkJobEnv.executePipeline();
} catch (Exception e) {
LOG.error("Pipeline execution failed", e);
throw new RuntimeException("Pipeline execution failed", e);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
index 711d9fb..493fb25 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
@@ -325,7 +325,9 @@ public class AutoComplete {
* Takes as input a the top candidates per prefix, and emits an entity
* suitable for writing to Datastore.
*/
- static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> {
+ static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String>
+ implements DoFn.RequiresWindowAccess{
+
private static final long serialVersionUID = 0;
@Override
@@ -357,6 +359,9 @@ public class AutoComplete {
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
+ options.setCheckpointingInterval(1000L);
+ options.setNumberOfExecutionRetries(5);
+ options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkPipelineRunner.class);
PTransform<? super PBegin, PCollection<String>> readSource =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
index 9a5db64..60f6788 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
@@ -124,9 +124,10 @@ public class JoinExamples {
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
- // make it a streaming example.
options.setStreaming(true);
+ options.setCheckpointingInterval(1000L);
+ options.setNumberOfExecutionRetries(5);
+ options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkPipelineRunner.class);
PTransform<? super PBegin, PCollection<String>> readSourceA =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
index 42d3d88..dba2721 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
@@ -104,6 +104,9 @@ public class KafkaWindowedWordCountExample {
KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
options.setJobName("KafkaExample");
options.setStreaming(true);
+ options.setCheckpointingInterval(1000L);
+ options.setNumberOfExecutionRetries(5);
+ options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkPipelineRunner.class);
System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
index b539245..37dc39a 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
@@ -99,6 +99,9 @@ public class WindowedWordCount {
options.setStreaming(true);
options.setWindowSize(10L);
options.setSlide(5L);
+ options.setCheckpointingInterval(1000L);
+ options.setNumberOfExecutionRetries(5);
+ options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkPipelineRunner.class);
LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
index 46d3e36..27cc923 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
@@ -69,7 +69,6 @@ public class FlinkStreamingTransformTranslators {
// here you can find all the available translators.
static {
-
TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
@@ -79,7 +78,6 @@ public class FlinkStreamingTransformTranslators {
TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
-
}
public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
index df68e50..7c4ab93 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
@@ -18,7 +18,10 @@ package com.dataartisans.flink.dataflow.translation;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.*;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -40,8 +43,8 @@ public class FlinkStreamingTranslationContext {
private AppliedPTransform<?, ?, ?> currentTransform;
public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
- this.env = env;
- this.options = options;
+ this.env = Preconditions.checkNotNull(env);
+ this.options = Preconditions.checkNotNull(options);
this.dataStreams = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 71f9c7f..dfb2b7d 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -116,10 +116,10 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
@Override
public BoundedWindow window() {
-// if (!(fn instanceof DoFn.RequiresWindowAccess)) {
-// throw new UnsupportedOperationException(
-// "window() is only available in the context of a DoFn marked as RequiresWindow.");
-// }
+ if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+ throw new UnsupportedOperationException(
+ "window() is only available in the context of a DoFn marked as RequiresWindow.");
+ }
Collection<? extends BoundedWindow> windows = this.element.getWindows();
if (windows.size() != 1) {
@@ -211,7 +211,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
@Override
public Object element() {
throw new UnsupportedOperationException(
- "WindowFn attempted to access input element when none was available"); // TODO: 12/16/15 aljoscha's comment in slack
+ "WindowFn attempted to access input element when none was available");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 0f0a9d0..b78db65 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -238,9 +238,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
this.operator = StreamingGroupAlsoByWindowsDoFn.createForIterable(
this.windowingStrategy, inputValueCoder);
} else {
-
Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
- //CoderRegistry dataflowRegistry = input.getPipeline().getCoderRegistry();
AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn
.withInputCoder(combineFn, coderRegistry, inputKvCoder);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
index b8824f5..c952d6f 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
@@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
import java.io.ByteArrayInputStream;
import java.util.List;
@@ -44,17 +45,15 @@ public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN
public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
@SuppressWarnings("unchecked")
- // TODO Flink doesn't allow null values in records
OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE;
-
for (byte[] element : elements) {
ByteArrayInputStream bai = new ByteArrayInputStream(element);
OUT outValue = coder.decode(bai, Coder.Context.OUTER);
if (outValue == null) {
- out.collect(WindowedValue.of(voidValue, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+ out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
} else {
- out.collect(WindowedValue.of(outValue, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+ out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 3e248a6..cdc2e95 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -38,7 +38,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
private final UnboundedSource.UnboundedReader<T> reader;
private StreamingRuntimeContext runtime = null;
- private StreamSource.ManualWatermarkContext<T> context = null;
+ private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
private volatile boolean isRunning = false;
@@ -51,8 +51,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
return this.name;
}
- WindowedValue<T> makeWindowedValue(
- T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
if (timestamp == null) {
timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
}
@@ -66,7 +65,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
"Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
}
- context = (StreamSource.ManualWatermarkContext<T>) ctx;
+ context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
runtime = (StreamingRuntimeContext) getRuntimeContext();
this.isRunning = reader.start();
@@ -78,11 +77,9 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
T item = reader.getCurrent();
Instant timestamp = reader.getCurrentTimestamp();
- long milliseconds = timestamp.getMillis();
-
// write it to the output collector
synchronized (ctx.getCheckpointLock()) {
- ctx.collectWithTimestamp(makeWindowedValue(item, timestamp, null, PaneInfo.NO_FIRING), milliseconds);
+ context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
}
// try to go to the next record
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
index c6e3e99..2b1f091 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
@@ -56,14 +56,14 @@ public class AvroITCase extends JavaProgramTestBase {
}
private static void runProgram(String tmpPath, String resultPath) {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
p.apply(Create.of(new User("Joe", 3, "red"), new User("Mary", 4, "blue")).withCoder(AvroCoder.of(User.class)))
.apply(AvroIO.Write.to(tmpPath).withSchema(User.class));
p.run();
- p = FlinkTestPipeline.create();
+ p = FlinkTestPipeline.createForBatch();
p.apply(AvroIO.Read.from(tmpPath).withSchema(User.class))
.apply(ParDo.of(new DoFn<User, String>() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
index bc24514..928388c 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
@@ -51,7 +51,7 @@ public class FlattenizeITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
PCollection<String> p1 = p.apply(Create.of(words));
PCollection<String> p2 = p.apply(Create.of(words2));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index 109b1ff..56af3f1 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -32,7 +32,7 @@ public class FlinkTestPipeline extends Pipeline {
* <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
* {@link Pipeline#run} to execute the pipeline and check the tests.
*/
- public static FlinkTestPipeline create() {
+ public static FlinkTestPipeline createForBatch() {
return create(false);
}
@@ -44,7 +44,7 @@ public class FlinkTestPipeline extends Pipeline {
*
* @return The Test Pipeline
*/
- public static FlinkTestPipeline createStreaming() {
+ public static FlinkTestPipeline createForStreaming() {
return create(true);
}
@@ -54,18 +54,18 @@ public class FlinkTestPipeline extends Pipeline {
* <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
* {@link Pipeline#run} to execute the pipeline and check the tests.
*
- * @param streaming True for streaming mode, False for batch
- * @return The Test Pipeline
+ * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
+ * @return The Test Pipeline.
*/
- public static FlinkTestPipeline create(boolean streaming) {
+ private static FlinkTestPipeline create(boolean streaming) {
FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
FlinkPipelineOptions pipelineOptions = flinkRunner.getPipelineOptions();
pipelineOptions.setStreaming(streaming);
return new FlinkTestPipeline(flinkRunner, pipelineOptions);
}
- private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions
- options) {
+ private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
+ PipelineOptions options) {
super(runner, options);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
index ed2ecf5..af0f217 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
@@ -84,7 +84,7 @@ public class JoinExamplesITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
index 29c34d4..35f2eaf 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
@@ -47,7 +47,7 @@ public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Seriali
@Override
protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
.apply(ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
index dbe88d2..ccdbbf9 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
@@ -47,7 +47,7 @@ public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Seria
@Override
protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
index ba675b1..39f54e4 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
@@ -61,7 +61,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
private static void runProgram(String resultPath) {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
PCollection<String> result = p
.apply(Read.from(new ReadSource(1, 10)))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
index ff59db7..db794f7 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
@@ -52,7 +52,7 @@ public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase {
List<String> strings = Collections.emptyList();
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
PCollection<String> input =
p.apply(Create.of(strings))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
index a8200aa..04e06b8 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
@@ -53,7 +53,7 @@ public class RemoveDuplicatesITCase extends JavaProgramTestBase {
List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3");
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
PCollection<String> input =
p.apply(Create.of(strings))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
index d932c80..ee8843c 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
@@ -36,7 +36,7 @@ public class SideInputITCase extends JavaProgramTestBase implements Serializable
protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
final PCollectionView<String> sidesInput = p
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
index e801ac4..1b4afb3 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
@@ -53,7 +53,7 @@ public class TfIdfITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- Pipeline pipeline = FlinkTestPipeline.create();
+ Pipeline pipeline = FlinkTestPipeline.createForBatch();
pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
deleted file mode 100644
index eb020c5..0000000
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.dataartisans.flink.dataflow;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-
-/**
- * Session window test
- */
-public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
- protected String resultPath;
-
- public TopWikipediaSessionsITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "user: user1 value:3",
- "user: user1 value:1",
- "user: user2 value:4",
- "user: user2 value:6",
- "user: user3 value:7",
- "user: user3 value:2"
- };
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createStreaming();
-
- long now = System.currentTimeMillis() + 10000;
- System.out.println((now + 5000) / 1000);
-
- PCollection<KV<String, Long>> output =
- p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
- ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
- ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
- ("contributor_username", "user2"), new TableRow().set("timestamp", now)
- .set("contributor_username", "user3"))))
-
-
-
- .apply(ParDo.of(new DoFn<TableRow, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- TableRow row = c.element();
- long timestamp = (Long) row.get("timestamp");
- String userName = (String) row.get("contributor_username");
- if (userName != null) {
- // Sets the timestamp field to be used in windowing.
- c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
- }
- }
- }))
-
- .apply(ParDo.named("SampleUsers").of(
- new DoFn<String, String>() {
- private static final long serialVersionUID = 0;
-
- @Override
- public void processElement(ProcessContext c) {
- if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) {
- c.output(c.element());
- }
- }
- }))
-
- .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
- .apply(Count.<String>perElement());
-
- PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- KV<String, Long> el = c.element();
- String out = "user: " + el.getKey() + " value:" + el.getValue();
- System.out.println(out);
- c.output(out);
- }
- }));
-
- format.apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
index 9427ab6..5ddd379 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
@@ -58,7 +58,7 @@ public class WordCountITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
index c3eed61..ccc52c4 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
@@ -70,7 +70,7 @@ public class WordCountJoin2ITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
/* Create two PCollections and join them */
PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
index 33e67cc..e6eddc0 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
@@ -80,7 +80,7 @@ public class WordCountJoin3ITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
/* Create two PCollections and join them */
PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
index 205fe9b..865fc5f 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
@@ -63,7 +63,7 @@ public class WriteSinkITCase extends JavaProgramTestBase {
}
private static void runProgram(String resultPath) {
- Pipeline p = FlinkTestPipeline.create();
+ Pipeline p = FlinkTestPipeline.createForBatch();
p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
.apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
index b667187..1f36ee7 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -86,7 +86,7 @@ public class GroupAlsoByWindowTest {
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(1000));
long initialTime = 0L;
- Pipeline pipeline = FlinkTestPipeline.create();
+ Pipeline pipeline = FlinkTestPipeline.createForStreaming();
KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
@@ -145,7 +145,7 @@ public class GroupAlsoByWindowTest {
WindowingStrategy strategy = sessionWindowingStrategy;
long initialTime = 0L;
- Pipeline pipeline = FlinkTestPipeline.create();
+ Pipeline pipeline = FlinkTestPipeline.createForStreaming();
KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
@@ -382,7 +382,7 @@ public class GroupAlsoByWindowTest {
}
private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception {
- Pipeline pipeline = FlinkTestPipeline.create();
+ Pipeline pipeline = FlinkTestPipeline.createForStreaming();
KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
@@ -478,7 +478,7 @@ public class GroupAlsoByWindowTest {
@Override
public Object element() {
throw new UnsupportedOperationException(
- "WindowFn attempted to access input element when none was available"); // TODO: 12/16/15 aljoscha's comment in slack
+ "WindowFn attempted to access input element when none was available");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
new file mode 100644
index 0000000..1c800fa
--- /dev/null
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkTestPipeline;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+/**
+ * Session window test
+ */
+public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
+ protected String resultPath;
+
+ public TopWikipediaSessionsITCase(){
+ }
+
+ static final String[] EXPECTED_RESULT = new String[] {
+ "user: user1 value:3",
+ "user: user1 value:1",
+ "user: user2 value:4",
+ "user: user2 value:6",
+ "user: user3 value:7",
+ "user: user3 value:2"
+ };
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+
+ Pipeline p = FlinkTestPipeline.createForStreaming();
+
+ long now = System.currentTimeMillis() + 10000;
+ System.out.println((now + 5000) / 1000);
+
+ PCollection<KV<String, Long>> output =
+ p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
+ ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
+ ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
+ ("contributor_username", "user2"), new TableRow().set("timestamp", now)
+ .set("contributor_username", "user3"))))
+
+
+
+ .apply(ParDo.of(new DoFn<TableRow, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ TableRow row = c.element();
+ long timestamp = (Long) row.get("timestamp");
+ String userName = (String) row.get("contributor_username");
+ if (userName != null) {
+ // Sets the timestamp field to be used in windowing.
+ c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+ }
+ }
+ }))
+
+ .apply(ParDo.named("SampleUsers").of(
+ new DoFn<String, String>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) {
+ c.output(c.element());
+ }
+ }
+ }))
+
+ .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
+ .apply(Count.<String>perElement());
+
+ PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ KV<String, Long> el = c.element();
+ String out = "user: " + el.getKey() + " value:" + el.getValue();
+ System.out.println(out);
+ c.output(out);
+ }
+ }));
+
+ format.apply(TextIO.Write.to(resultPath));
+
+ p.run();
+ }
+}