You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/08/07 14:51:02 UTC
[1/2] flink git commit: [FLINK-2243] [storm-compat] Demonstrating
finite Storm spout functionality on exclamation example -minor renaming
-improving JavaDocs
Repository: flink
Updated Branches:
refs/heads/master f1dd914de -> dba2946f4
[FLINK-2243] [storm-compat] Demonstrating finite Storm spout functionality on exclamation example
-minor renaming
-improving JavaDocs
Closes #853
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dba2946f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dba2946f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dba2946f
Branch: refs/heads/master
Commit: dba2946f465a72f18b6452e7ab34f9198b71a908
Parents: 6d9eeb5
Author: szape <ne...@gmail.com>
Authored: Fri Jun 19 10:43:10 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Aug 7 14:50:02 2015 +0200
----------------------------------------------------------------------
docs/apis/storm_compatibility.md | 51 +++++++
.../excamation/ExclamationTopology.java | 47 +++++--
.../excamation/ExclamationWithStormBolt.java | 131 +++++++++++++++++
.../excamation/ExclamationWithStormSpout.java | 140 +++++++++++++++++++
.../excamation/StormBoltExclamation.java | 113 ---------------
.../excamation/StormExclamationLocal.java | 25 +++-
.../StormExclamationRemoteByClient.java | 22 +++
.../StormExclamationRemoteBySubmitter.java | 21 +++
.../excamation/StormSpoutExclamation.java | 118 ----------------
.../util/FiniteStormFileSpout.java | 97 +++++++++++++
.../util/FiniteStormInMemorySpout.java | 48 +++++++
.../util/OutputFormatter.java | 11 +-
.../util/RawOutputFormatter.java | 32 -----
.../util/SimpleOutputFormatter.java | 15 +-
.../wordcount/BoltTokenizerWordCount.java | 2 +-
.../wordcount/SpoutSourceWordCount.java | 4 +-
.../wordcount/StormWordCountLocal.java | 2 +-
.../wordcount/StormWordCountRemoteByClient.java | 2 +-
.../StormWordCountRemoteBySubmitter.java | 2 +-
.../wordcount/WordCountTopology.java | 2 +-
.../ExclamationWithStormBoltITCase.java | 47 +++++++
.../ExclamationWithStormSpoutITCase.java | 47 +++++++
.../exclamation/StormBoltExclamationITCase.java | 47 -------
.../StormSpoutExclamationITCase.java | 47 -------
24 files changed, 689 insertions(+), 384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index b8fe66e..1390b92 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -167,6 +167,57 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.
See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.
+# Flink Extensions
+
+## Finite Storm Spouts
+
+In Flink streaming, sources can be finite - i.e. emit a finite number of records and stop after emitting the last record -, however, Storm spouts always emit infinite streams.
+The bridge between the two approach is the `FiniteStormSpout` interface which, in addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can specify a stopping-condition.
+The user can create a finite Storm spout by implementing this interface instead of `IRichSpout`, and implementing the `reachedEnd()`method in addition.
+When used as part of a Flink topology, a `FiniteStormSpout` should be wrapped in a `FiniteStormSpoutWrapper` class.
+
+Although finite Storm spouts are not necessary to embed Storm spouts into a Flink streaming program or to submit a whole Storm topology to Flink, there are cases where they may come in handy:
+
+ * to achieve that a native Storm spout behaves the same way as a finite Flink source with minimal modifications
+ * the user wants to process a stream only for some time; after that, the spout can stop automatically
+ * reading a file into a stream
+ * for testing purposes
+
+A `FiniteStormSpout` can be still used as a normal, infinite Storm spout by changing its wrapper class to `StormSpoutWraper` in the Flink topology.
+
+An example of a finite Storm spout that emits records for 10 seconds only:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+public class TimedFiniteStormSpout extends AbstractStormSpout implements FiniteStormSpout {
+ [...]
+ private long starttime = System.currentTimeMillis();
+
+ public boolean reachedEnd() {
+ return System.currentTimeMillis() - starttime > 10000l;
+ }
+ [...]
+}
+~~~
+</div>
+</div>
+
+Using a `FiniteStormSpout` in a Flink topology:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> rawInput = env.addSource(
+ new FiniteStormSpoutWrapper<String>(new TimedFiniteStormSpout(), true)
+ TypeExtractor.getForClass(String.class));
+
+// process data stream
+[...]
+~~~
+</div>
+</div>
+
# Storm Compatibility Examples
You can find more examples in Maven module `flink-storm-compatibilty-examples`.
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
index a5bb571..b7c98a8 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
@@ -20,15 +20,32 @@ package org.apache.flink.stormcompatibility.excamation;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
+import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.RawOutputFormatter;
+import org.apache.flink.stormcompatibility.util.SimpleOutputFormatter;
import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
/**
- * This is a basic example of a Storm topology.
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>
+ * <result path></code><br/>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>construct a regular Storm topology as Flink program</li>
+ * <li>make use of the FiniteStormSpout interface</li>
+ * </ul>
*/
public class ExclamationTopology {
@@ -36,7 +53,7 @@ public class ExclamationTopology {
public final static String firstBoltId = "exclamation1";
public final static String secondBoltId = "exclamation2";
public final static String sinkId = "sink";
- private final static OutputFormatter formatter = new RawOutputFormatter();
+ private final static OutputFormatter formatter = new SimpleOutputFormatter();
public static FlinkTopologyBuilder buildTopology() {
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
@@ -46,9 +63,9 @@ public class ExclamationTopology {
// read the text file from given input path
final String[] tokens = textPath.split(":");
final String inputFile = tokens[tokens.length - 1];
- builder.setSpout(spoutId, new StormFileSpout(inputFile));
+ builder.setSpout(spoutId, new FiniteStormFileSpout(inputFile));
} else {
- builder.setSpout(spoutId, new StormInMemorySpout(WordCountData.WORDS));
+ builder.setSpout(spoutId, new FiniteStormInMemorySpout(WordCountData.WORDS));
}
builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId);
@@ -59,9 +76,11 @@ public class ExclamationTopology {
// read the text file from given input path
final String[] tokens = outputPath.split(":");
final String outputFile = tokens[tokens.length - 1];
- builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(secondBoltId);
+ builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
+ .shuffleGrouping(secondBoltId);
} else {
- builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(secondBoltId);
+ builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
+ .shuffleGrouping(secondBoltId);
}
return builder;
@@ -84,13 +103,17 @@ public class ExclamationTopology {
textPath = args[0];
outputPath = args[1];
} else {
- System.err.println("Usage: StormExclamation* <text path> <result path>");
+ System.err.println(
+ "Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text " +
+ "path> <result path>");
return false;
}
} else {
- System.out.println("Executing StormExclamation* example with built-in default data");
+ System.out.println("Executing StormExclamation example with built-in default data");
System.out.println(" Provide parameters to read input data from a file");
- System.out.println(" Usage: StormExclamation* <text path> <result path>");
+ System.out.println(
+ " Usage: StormExclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>" +
+ " <result path>");
}
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
new file mode 100644
index 0000000..7bcb7f9
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationWithStormBolt <text path> <result path></code><br/>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm bolt within a Flink Streaming program</li>
+ * </ul>
+ */
+public class ExclamationWithStormBolt {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<String> text = getTextDataStream(env);
+
+ final DataStream<String> exclaimed = text
+ .transform("StormBoltTokenizer",
+ TypeExtractor.getForObject(""),
+ new StormBoltWrapper<String, String>(new ExclamationBolt(), true))
+ .map(new ExclamationMap());
+
+ // emit result
+ if (fileOutput) {
+ exclaimed.writeAsText(outputPath);
+ } else {
+ exclaimed.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with Storm bolt tokenizer");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ private static class ExclamationMap implements MapFunction<String, String> {
+
+ @Override
+ public String map(String value) throws Exception {
+ return value + "!!!";
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: ExclamationWithStormBolt <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing ExclamationWithStormBolt example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: ExclamationWithStormBolt <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ }
+
+ return env.fromElements(WordCountData.WORDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
new file mode 100644
index 0000000..f027eae
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.util.FiniteStormFileSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormInMemorySpout;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationWithStormSpout <text path> <result path></code><br/>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm spout within a Flink Streaming program</li>
+ * <li>make use of the FiniteStormSpout interface</li>
+ * </ul>
+ */
+public class ExclamationWithStormSpout {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<String> text = getTextDataStream(env);
+
+ final DataStream<String> exclaimed = text
+ .map(new ExclamationMap())
+ .map(new ExclamationMap());
+
+ // emit result
+ if (fileOutput) {
+ exclaimed.writeAsText(outputPath);
+ } else {
+ exclaimed.print();
+ }
+
+ // execute program
+ env.execute("Streaming Exclamation with Storm spout source");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ private static class ExclamationMap implements MapFunction<String, String> {
+
+ @Override
+ public String map(String value) throws Exception {
+ return value + "!!!";
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: ExclamationWithStormSpout <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing ExclamationWithStormSpout example with built-in default " +
+ "data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: ExclamationWithStormSpout <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ final String[] tokens = textPath.split(":");
+ final String localFile = tokens[tokens.length - 1];
+ return env.addSource(
+ new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(localFile), true),
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
+ }
+
+ return env.addSource(
+ new FiniteStormSpoutWrapper<String>(
+ new FiniteStormInMemorySpout(WordCountData.WORDS), true),
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java
deleted file mode 100644
index 52e740c..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormBoltExclamation.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class StormBoltExclamation {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(final String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- final DataStream<String> text = getTextDataStream(env);
-
- final DataStream<String> exclaimed = text
- .transform("StormBoltTokenizer",
- TypeExtractor.getForObject(""),
- new StormBoltWrapper<String, String>(new ExclamationBolt(), true))
- .map(new ExclamationMap());
-
- // emit result
- if (fileOutput) {
- exclaimed.writeAsText(outputPath);
- } else {
- exclaimed.print();
- }
-
- // execute program
- env.execute("Streaming WordCount with Storm bolt tokenizer");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- private static class ExclamationMap implements MapFunction<String, String> {
-
- @Override
- public String map(String value) throws Exception {
- return value + "!!!";
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String textPath;
- private static String outputPath;
-
- private static boolean parseParameters(final String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if (args.length == 2) {
- textPath = args[0];
- outputPath = args[1];
- } else {
- System.err.println("Usage: StormBoltExclamation <text path> <result path>");
- return false;
- }
- } else {
- System.out.println("Executing StormBoltExclamation example with built-in default data");
- System.out.println(" Provide parameters to read input data from a file");
- System.out.println(" Usage: StormBoltExclamation <text path> <result path>");
- }
- return true;
- }
-
- private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
- if (fileOutput) {
- // read the text file from given input path
- return env.readTextFile(textPath);
- }
-
- return env.fromElements(WordCountData.WORDS);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
index a25e5e0..5941ff0 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationLocal.java
@@ -21,6 +21,27 @@ import backtype.storm.utils.Utils;
import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
+ * submitted to Flink for execution in the same way as to a Storm {@link LocalCluster}.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot be used to submit a
+ * {@link StormTopology} via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationLocal <text path> <result path></code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink</li>
+ * </ul>
+ */
public class StormExclamationLocal {
public final static String topologyId = "Streaming Exclamation";
@@ -43,10 +64,6 @@ public class StormExclamationLocal {
cluster.submitTopology(topologyId, null, builder.createTopology());
Utils.sleep(10 * 1000);
-
- // TODO kill does no do anything so far
- cluster.killTopology(topologyId);
- cluster.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
index 3f55316..0f64301 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteByClient.java
@@ -25,6 +25,28 @@ import backtype.storm.utils.Utils;
import org.apache.flink.stormcompatibility.api.FlinkClient;
import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
+ * submitted to Flink for execution in the same way as to a Storm cluster similar to
+ * {@link NimbusClient}. The Flink cluster can be local or remote.
+ * <p/>
+ * This example shows how to submit the program via Java, thus it cannot be used to submit a
+ * {@link StormTopology} via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationRemoteByClient <text path> <result path></code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
+ * </ul>
+ */
public class StormExclamationRemoteByClient {
public final static String topologyId = "Streaming Exclamation";
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
index 728c5c7..d580520 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormExclamationRemoteBySubmitter.java
@@ -22,6 +22,27 @@ import org.apache.flink.stormcompatibility.api.FlinkClient;
import org.apache.flink.stormcompatibility.api.FlinkSubmitter;
import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
+ * files in a streaming fashion. The program is constructed as a regular {@link StormTopology} and
+ * submitted to Flink for execution in the same way as to a Storm cluster similar to
+ * {@link StormSubmitter}. The Flink cluster can be local or remote.
+ * <p/>
+ * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>StormExclamationRemoteByClient <text path> <result path></code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
+ * </ul>
+ */
public class StormExclamationRemoteBySubmitter {
public final static String topologyId = "Streaming Exclamation";
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java
deleted file mode 100644
index 2569fa7..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/StormSpoutExclamation.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
-import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class StormSpoutExclamation {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(final String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- final DataStream<String> text = getTextDataStream(env);
-
- final DataStream<String> exclaimed = text
- .map(new ExclamationMap())
- .map(new ExclamationMap());
-
- // emit result
- if (fileOutput) {
- exclaimed.writeAsText(outputPath);
- } else {
- exclaimed.print();
- }
-
- // execute program
- env.execute("Streaming Exclamation with Storm spout source");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- private static class ExclamationMap implements MapFunction<String, String> {
-
- @Override
- public String map(String value) throws Exception {
- return value + "!!!";
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String textPath;
- private static String outputPath;
-
- private static boolean parseParameters(final String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if (args.length == 2) {
- textPath = args[0];
- outputPath = args[1];
- } else {
- System.err.println("Usage: StormSpoutExclamation <text path> <result path>");
- return false;
- }
- } else {
- System.out.println("Executing StormSpoutExclamation example with built-in default data");
- System.out.println(" Provide parameters to read input data from a file");
- System.out.println(" Usage: StormSpoutExclamation <text path> <result path>");
- }
- return true;
- }
-
- private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
- if (fileOutput) {
- // read the text file from given input path
- final String[] tokens = textPath.split(":");
- final String localFile = tokens[tokens.length - 1];
- return env.addSource(
- new StormFiniteSpoutWrapper<String>(new StormFileSpout(localFile), true),
- TypeExtractor.getForClass(String.class)).setParallelism(1);
- }
-
- return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
- TypeExtractor.getForClass(String.class));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
new file mode 100644
index 0000000..d45ad76
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implements a Storm Spout that reads data from a given local file. The spout stops automatically
+ * when it reached the end of the file.
+ */
+public class FiniteStormFileSpout extends AbstractStormSpout implements FiniteStormSpout {
+ private static final long serialVersionUID = -6996907090003590436L;
+
+ private final String path;
+ private BufferedReader reader;
+ private String line;
+ private boolean newLineRead;
+
+ public FiniteStormFileSpout(final String path) {
+ this.path = path;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(final Map conf, final TopologyContext context,
+ final SpoutOutputCollector collector) {
+ super.open(conf, context, collector);
+ try {
+ this.reader = new BufferedReader(new FileReader(this.path));
+ } catch (final FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ newLineRead = false;
+ }
+
+ @Override
+ public void close() {
+ if (this.reader != null) {
+ try {
+ this.reader.close();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ this.collector.emit(new Values(line));
+ newLineRead = false;
+ }
+
+ /**
+ * Can be called before nextTuple() any times including 0.
+ */
+ public boolean reachedEnd() {
+ try {
+ readLine();
+ } catch (IOException e) {
+ throw new RuntimeException("Exception occured while reading file " + path);
+ }
+ return line == null;
+ }
+
+ private void readLine() throws IOException {
+ if (!newLineRead) {
+ line = reader.readLine();
+ newLineRead = true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
new file mode 100644
index 0000000..899c569
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Values;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+
+/**
+ * Implements a Storm Spout that reads String[] data stored in the memory. The spout stops
+ * automatically when it emitted all of the data.
+ */
+public class FiniteStormInMemorySpout extends AbstractStormSpout implements FiniteStormSpout {
+
+ private static final long serialVersionUID = -4008858647468647019L;
+
+ private String[] source;
+ private int counter = 0;
+
+ public FiniteStormInMemorySpout(String[] source) {
+ this.source = source;
+ }
+
+ @Override
+ public void nextTuple() {
+ this.collector.emit(new Values(source[this.counter++]));
+ }
+
+ public boolean reachedEnd() {
+ return counter >= source.length;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
index bfc3135..ec9adfe 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
@@ -18,12 +18,19 @@
package org.apache.flink.stormcompatibility.util;
-import java.io.Serializable;
-
import backtype.storm.tuple.Tuple;
+import java.io.Serializable;
+
public interface OutputFormatter extends Serializable {
+ /**
+ * Converts a Storm {@link Tuple} to a string. This method is used for formatting the output
+ * tuples before writing them out to a file or to the consol.
+ *
+ * @param input The tuple to be formatted
+ * @return The string result of the formatting
+ */
public String format(Tuple input);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
deleted file mode 100644
index 7faf6cd..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Tuple;
-
-public class RawOutputFormatter implements OutputFormatter {
- private static final long serialVersionUID = 8685668993521259832L;
-
- @Override
- public String format(final Tuple input) {
- assert (input.size() == 1);
- return input.getValue(0).toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
index ccb617b..0702e94 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
@@ -23,9 +23,20 @@ import backtype.storm.tuple.Tuple;
public class SimpleOutputFormatter implements OutputFormatter {
private static final long serialVersionUID = 6349573860144270338L;
+ /**
+ * Converts a Storm {@link Tuple} with 1 field to a string by retrieving the value of that
+ * field. This method is used for formatting raw outputs wrapped in tuples, before writing them
+ * out to a file or to the consol.
+ *
+ * @param input
+ * The tuple to be formatted
+ * @return The string result of the formatting
+ */
@Override
public String format(final Tuple input) {
- return input.getValues().toString();
+ if (input.getValues().size() != 1) {
+ throw new RuntimeException("The output is not raw");
+ }
+ return input.getValue(0).toString();
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
index 8f4503f..eab58f5 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
@@ -40,7 +40,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* <p/>
* This example shows how to:
* <ul>
- * <li>use a Storm bolt within a Flink Streaming program.
+ * <li>use a Storm bolt within a Flink Streaming program.</li>
* </ul>
*/
public class BoltTokenizerWordCount {
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
index 361d83a..4c012d8 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -43,7 +43,7 @@ import org.apache.flink.util.Collector;
* <p/>
* This example shows how to:
* <ul>
- * <li>use a Storm bolt within a Flink Streaming program.
+ * <li>use a Storm spout within a Flink Streaming program.</li>
* </ul>
*/
public class SpoutSourceWordCount {
@@ -145,7 +145,7 @@ public class SpoutSourceWordCount {
}
return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
- TypeExtractor.getForClass(String.class));
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
index 3fbd5b7..836c8e9 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
@@ -42,7 +42,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
* <p/>
* This example shows how to:
* <ul>
- * <li>run a regular Storm program locally on Flink
+ * <li>run a regular Storm program locally on Flink</li>
* </ul>
*/
public class StormWordCountLocal {
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
index 9e56c14..0bbe11b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
@@ -46,7 +46,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
* <p/>
* This example shows how to:
* <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
* </ul>
*/
public class StormWordCountRemoteByClient {
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
index a1fb79d..264dc41 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
@@ -42,7 +42,7 @@ import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
* <p/>
* This example shows how to:
* <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
* </ul>
*/
public class StormWordCountRemoteBySubmitter {
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
index f028266..367ca9e 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
@@ -47,7 +47,7 @@ import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTok
* <p/>
* This example shows how to:
* <ul>
- * <li>how to construct a regular Storm topology as Flink program
+ * <li>how to construct a regular Storm topology as Flink program</li>
* </ul>
*/
public class WordCountTopology {
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
new file mode 100644
index 0000000..930f87b
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormBoltITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormBolt;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class ExclamationWithStormBoltITCase extends StreamingProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExclamationWithStormBolt.main(new String[]{this.textPath, this.resultPath});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
new file mode 100644
index 0000000..4c515ce
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/ExclamationWithStormSpoutITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.excamation.ExclamationWithStormSpout;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class ExclamationWithStormSpoutITCase extends StreamingProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExclamationWithStormSpout.main(new String[]{this.textPath, this.resultPath});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
deleted file mode 100644
index 75dd5fc..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation;
-
-import org.apache.flink.stormcompatibility.excamation.StormBoltExclamation;
-import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class StormBoltExclamationITCase extends StreamingProgramTestBase {
-
- protected String textPath;
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
- this.resultPath = this.getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- StormBoltExclamation.main(new String[]{this.textPath, this.resultPath});
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dba2946f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
deleted file mode 100644
index 2b08b4b..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.exclamation;
-
-import org.apache.flink.stormcompatibility.excamation.StormSpoutExclamation;
-import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class StormSpoutExclamationITCase extends StreamingProgramTestBase {
-
- protected String textPath;
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
- this.resultPath = this.getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- StormSpoutExclamation.main(new String[]{this.textPath, this.resultPath});
- }
-
-}
[2/2] flink git commit: [FLINK-2243] [storm-compat] Added finite
spout functionality to Storm compatibility layer
Posted by mb...@apache.org.
[FLINK-2243] [storm-compat] Added finite spout functionality to Storm compatibility layer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d9eeb55
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d9eeb55
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d9eeb55
Branch: refs/heads/master
Commit: 6d9eeb559a895d92bb0a71c6535c55dfb49a16cb
Parents: f1dd914
Author: szape <ne...@gmail.com>
Authored: Fri Jun 19 10:22:04 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Aug 7 14:50:02 2015 +0200
----------------------------------------------------------------------
.../api/FlinkTopologyBuilder.java | 13 ++-
.../wrappers/AbstractStormSpoutWrapper.java | 1 -
.../wrappers/FiniteStormSpout.java | 37 +++++++++
.../wrappers/FiniteStormSpoutWrapper.java | 87 ++++++++++++++++++++
.../wrappers/FiniteStormSpoutWrapperTest.java | 69 ++++++++++++++++
5 files changed, 205 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index 4ecf4a6..d146250 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -33,6 +33,9 @@ import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -93,7 +96,15 @@ public class FlinkTopologyBuilder {
* -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
* the streams
*/
- final DataStreamSource source = env.addSource(new StormSpoutWrapper(userSpout), declarer.getOutputType());
+ AbstractStormSpoutWrapper spoutWrapper;
+
+ if (userSpout instanceof FiniteStormSpout) {
+ spoutWrapper = new FiniteStormSpoutWrapper((FiniteStormSpout) userSpout);
+ } else {
+ spoutWrapper = new StormSpoutWrapper(userSpout);
+ }
+
+ final DataStreamSource source = env.addSource(spoutWrapper, declarer.getOutputType());
availableOperators.put(spoutId, source);
int dop = 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index 3021bcb..4e43a8a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -19,7 +19,6 @@ package org.apache.flink.stormcompatibility.wrappers;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;
-
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
new file mode 100644
index 0000000..58a4f7a
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.topology.IRichSpout;
+
+/**
+ * This interface represents a Storm spout that emits a finite number of records. Common Storm
+ * spouts emit infinite streams by default. To change this behaviour and take advantage of
+ * Flink's finite-source capabilities, the spout should implement this interface. To wrap
+ * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
+ */
+public interface FiniteStormSpout extends IRichSpout {
+
+ /**
+ * When returns true, the spout has reached the end of the stream.
+ *
+ * @return true, if the spout's stream reached its end, false otherwise
+ */
+ public boolean reachedEnd();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
new file mode 100644
index 0000000..7913510
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+/**
+ * A {@link FiniteStormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped
+ * {@link FiniteStormSpout}'s {@link FiniteStormSpout#nextTuple()} method until {@link
+ * FiniteStormSpout#reachedEnd()} is true.
+ */
+public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
+ private static final long serialVersionUID = -218340336648247605L;
+
+ private FiniteStormSpout finiteSpout;
+
+ /**
+ * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
+ * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
+ * output
+ * type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
+ * number of attributes.
+ *
+ * @param spout
+ * The Storm {@link FiniteStormSpout spout} to be used. @throws
+ * IllegalArgumentException If
+ * the number of declared output attributes is not with range [1;25].
+ */
+ public FiniteStormSpoutWrapper(FiniteStormSpout spout)
+ throws IllegalArgumentException {
+ super(spout);
+ this.finiteSpout = spout;
+ }
+
+ /**
+ * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
+ * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
+ * output
+ * type can be any type if parameter {@code rawOutput} is {@code true} and the spout's
+ * number of
+ * declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be
+ * one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
+ * attributes.
+ *
+ * @param spout
+ * The Storm {@link FiniteStormSpout spout} to be used.
+ * @param rawOutput
+ * Set to {@code true} if a single attribute output stream, should not be of type {@link
+ * Tuple1} but be of a raw type.
+ * @throws IllegalArgumentException
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is
+ * not 1
+ * or if {@code rawOuput} is {@code false} and the number of declared output attributes
+ * is not
+ * with range [1;25].
+ */
+ public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final boolean rawOutput)
+ throws IllegalArgumentException {
+ super(spout, rawOutput);
+ this.finiteSpout = spout;
+ }
+
+ /**
+ * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link
+ * FiniteStormSpout#reachedEnd()} is true or {@link FiniteStormSpout#cancel()} is called.
+ */
+ @Override
+ protected void execute() {
+ while (super.isRunning && !finiteSpout.reachedEnd()) {
+ finiteSpout.nextTuple();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d9eeb55/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
new file mode 100644
index 0000000..776e65d
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormWrapperSetupHelper.class)
+public class FiniteStormSpoutWrapperTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void runAndExecuteTest1() throws Exception {
+
+ FiniteStormSpout stormSpout =
+ mock(FiniteStormSpout.class);
+ when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true);
+
+ FiniteStormSpoutWrapper<?> wrapper =
+ new FiniteStormSpoutWrapper<Object>(stormSpout);
+ wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+ wrapper.run(mock(SourceContext.class));
+ verify(stormSpout, times(3)).nextTuple();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void runAndExecuteTest2() throws Exception {
+
+ FiniteStormSpout stormSpout =
+ mock(FiniteStormSpout.class);
+ when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true);
+
+ FiniteStormSpoutWrapper<?> wrapper =
+ new FiniteStormSpoutWrapper<Object>(stormSpout);
+ wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+
+ wrapper.run(mock(SourceContext.class));
+ verify(stormSpout, never()).nextTuple();
+ }
+
+}