You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/06/01 10:57:07 UTC
[03/11] flink git commit: [FLINK-6695] Activate strict checkstyle for
flink-storm-examples
[FLINK-6695] Activate strict checkstyle for flink-storm-examples
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a2d984f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a2d984f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a2d984f
Branch: refs/heads/master
Commit: 2a2d984f7258e62eff34b4deba3f803529553227
Parents: 40cb093
Author: zentol <ch...@apache.org>
Authored: Tue May 23 23:47:00 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 1 11:14:11 2017 +0200
----------------------------------------------------------------------
flink-contrib/flink-storm-examples/pom.xml | 35 ++++++++++++++++++++
.../storm/exclamation/ExclamationLocal.java | 25 +++++++-------
.../storm/exclamation/ExclamationTopology.java | 25 +++++++-------
.../storm/exclamation/ExclamationWithBolt.java | 15 +++++----
.../storm/exclamation/ExclamationWithSpout.java | 15 +++++----
.../exclamation/operators/ExclamationBolt.java | 8 +++--
.../flink/storm/join/SingleJoinExample.java | 17 +++++-----
.../flink/storm/print/PrintSampleStream.java | 14 ++++----
.../flink/storm/split/SpoutSplitExample.java | 11 +++---
.../storm/split/operators/RandomSpout.java | 10 ++++--
.../split/operators/VerifyAndEnrichBolt.java | 9 +++--
.../flink/storm/util/AbstractBoltSink.java | 4 +--
.../flink/storm/util/AbstractLineSpout.java | 2 +-
.../org/apache/flink/storm/util/FileSpout.java | 6 ++--
.../flink/storm/util/FiniteFileSpout.java | 1 +
.../flink/storm/util/OutputFormatter.java | 8 +++--
.../flink/storm/util/SimpleOutputFormatter.java | 6 +++-
.../flink/storm/util/TupleOutputFormatter.java | 4 +++
.../storm/wordcount/BoltTokenizerWordCount.java | 15 +++++----
.../wordcount/BoltTokenizerWordCountPojo.java | 15 +++++----
.../BoltTokenizerWordCountWithNames.java | 19 ++++++-----
.../storm/wordcount/SpoutSourceWordCount.java | 18 +++++-----
.../flink/storm/wordcount/WordCountLocal.java | 26 +++++++--------
.../storm/wordcount/WordCountLocalByName.java | 26 +++++++--------
.../wordcount/WordCountRemoteByClient.java | 28 ++++++++--------
.../wordcount/WordCountRemoteBySubmitter.java | 28 ++++++++--------
.../storm/wordcount/WordCountTopology.java | 30 ++++++++---------
.../storm/wordcount/operators/BoltCounter.java | 4 +--
.../wordcount/operators/BoltCounterByName.java | 4 +--
.../wordcount/operators/BoltTokenizer.java | 4 +--
.../operators/BoltTokenizerByName.java | 4 +--
.../wordcount/operators/WordCountDataPojos.java | 12 +++++--
.../wordcount/operators/WordCountDataTuple.java | 5 ++-
.../operators/WordCountInMemorySpout.java | 5 +--
.../exclamation/ExclamationWithBoltITCase.java | 3 ++
.../exclamation/ExclamationWithSpoutITCase.java | 3 ++
.../StormExclamationLocalITCase.java | 3 ++
.../storm/exclamation/util/ExclamationData.java | 3 ++
.../flink/storm/join/SingleJoinITCase.java | 8 +++--
.../org/apache/flink/storm/split/SplitBolt.java | 8 +++--
.../flink/storm/split/SplitBoltTopology.java | 19 +++++++----
.../apache/flink/storm/split/SplitITCase.java | 11 ++++--
.../flink/storm/split/SplitSpoutTopology.java | 18 ++++++----
.../flink/storm/split/SplitStreamBoltLocal.java | 11 ++++--
.../storm/split/SplitStreamSpoutLocal.java | 11 ++++--
.../storm/tests/StormFieldsGroupingITCase.java | 24 ++++++++------
.../flink/storm/tests/StormMetaDataITCase.java | 18 ++++++----
.../flink/storm/tests/StormUnionITCase.java | 22 +++++++-----
.../tests/operators/FiniteRandomSpout.java | 11 ++++--
.../flink/storm/tests/operators/MergerBolt.java | 8 +++--
.../storm/tests/operators/MetaDataSpout.java | 8 +++--
.../flink/storm/tests/operators/TaskIdBolt.java | 5 +--
.../tests/operators/VerifyMetaDataBolt.java | 8 +++--
.../wordcount/BoltTokenizerWordCountITCase.java | 3 ++
.../BoltTokenizerWordCountPojoITCase.java | 3 ++
.../BoltTokenizerWordCountWithNamesITCase.java | 3 ++
.../wordcount/SpoutSourceWordCountITCase.java | 3 ++
.../storm/wordcount/WordCountLocalITCase.java | 3 ++
.../wordcount/WordCountLocalNamedITCase.java | 4 ++-
59 files changed, 425 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml
index 6ef0f7b..0296ff3 100644
--- a/flink-contrib/flink-storm-examples/pom.xml
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -384,6 +384,41 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<pluginManagement>
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
index c37ae65..6108f79 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
@@ -17,34 +17,35 @@
package org.apache.flink.storm.exclamation;
-import org.apache.storm.Config;
-import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+
/**
* Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming
* fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology} and submitted to
* Flink for execution in the same way as to a Storm {@link org.apache.storm.LocalCluster}.
- * <p>
- * This example shows how to run program directly within Java, thus it cannot be used to submit a
+ *
+ * <p>This example shows how to run program directly within Java, thus it cannot be used to submit a
* {@link org.apache.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>ExclamationLocal <text path> <result path></code><br>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>ExclamationLocal <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from
* {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>run a regular Storm program locally on Flink</li>
* </ul>
*/
public class ExclamationLocal {
- public final static String topologyId = "Streaming Exclamation";
+ public static final String TOPOLOGY_ID = "Streaming Exclamation";
// *************************************************************************
// PROGRAM
@@ -65,7 +66,7 @@ public class ExclamationLocal {
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
- cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
+ cluster.submitTopology(TOPOLOGY_ID, conf, FlinkTopology.createTopology(builder));
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
index 0144acb..51edd1f 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
@@ -17,7 +17,6 @@
package org.apache.flink.storm.exclamation;
-import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
import org.apache.flink.storm.util.BoltFileSink;
@@ -27,17 +26,19 @@ import org.apache.flink.storm.util.FiniteInMemorySpout;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.SimpleOutputFormatter;
+import org.apache.storm.topology.TopologyBuilder;
+
/**
* Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming
* fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>
* <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>construct a regular Storm topology as Flink program</li>
* <li>make use of the FiniteSpout interface</li>
@@ -45,11 +46,11 @@ import org.apache.flink.storm.util.SimpleOutputFormatter;
*/
public class ExclamationTopology {
- public final static String spoutId = "source";
- public final static String firstBoltId = "exclamation1";
- public final static String secondBoltId = "exclamation2";
- public final static String sinkId = "sink";
- private final static OutputFormatter formatter = new SimpleOutputFormatter();
+ private static final String spoutId = "source";
+ private static final String firstBoltId = "exclamation1";
+ private static final String secondBoltId = "exclamation2";
+ private static final String sinkId = "sink";
+ private static final OutputFormatter formatter = new SimpleOutputFormatter();
public static TopologyBuilder buildTopology() {
final TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
index 5a79119..a838e69 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
@@ -18,7 +18,6 @@
package org.apache.flink.storm.exclamation;
-import org.apache.storm.utils.Utils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
@@ -28,17 +27,19 @@ import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.storm.utils.Utils;
+
/**
* Implements the "Exclamation" program that attaches 3+x exclamation marks to every line of a text files in a streaming
* fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage:
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage:
* <code>ExclamationWithmBolt <text path> <result path> <number of exclamation marks></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData} with x=2.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>use a Bolt within a Flink Streaming program</li>
* <li>how to configure a Bolt using StormConfig</li>
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
index 237f1d4..b165f00 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
@@ -18,7 +18,6 @@
package org.apache.flink.storm.exclamation;
-import org.apache.storm.utils.Utils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
@@ -29,16 +28,18 @@ import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.storm.utils.Utils;
+
/**
* Implements the "Exclamation" program that attaches six exclamation marks to every line of a text files in a streaming
* fashion. The program is constructed as a regular {@link org.apache.storm.generated.StormTopology}.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>ExclamationWithSpout <text path> <result path></code><br>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>ExclamationWithSpout <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>use a Storm spout within a Flink Streaming program</li>
* <li>make use of the FiniteSpout interface</li>
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
index 77a91d2..8872acd 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
@@ -28,10 +28,14 @@ import org.apache.storm.tuple.Values;
import java.util.Map;
+/**
+ * A Bolt implementation that appends exclamation marks to incoming tuples. The number of added exclamation marks can
+ * be controlled by setting <code>exclamation.count</code>.
+ */
public class ExclamationBolt implements IRichBolt {
- private final static long serialVersionUID = -6364882114201311380L;
+ private static final long serialVersionUID = -6364882114201311380L;
- public final static String EXCLAMATION_COUNT = "exclamation.count";
+ public static final String EXCLAMATION_COUNT = "exclamation.count";
private OutputCollector collector;
private String exclamation;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
index 41ea4cb..b2ad05f 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
@@ -15,13 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.storm.join;
-import org.apache.storm.Config;
-import org.apache.storm.testing.FeederSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+package org.apache.flink.storm.join;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
@@ -29,10 +24,17 @@ import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.NullTerminatingSpout;
import org.apache.flink.storm.util.TupleOutputFormatter;
+import org.apache.storm.Config;
import org.apache.storm.starter.bolt.PrinterBolt;
import org.apache.storm.starter.bolt.SingleJoinBolt;
+import org.apache.storm.testing.FeederSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
-
+/**
+ * Implements a simple example where 2 input streams are being joined.
+ */
public class SingleJoinExample {
public static void main(String[] args) throws Exception {
@@ -79,7 +81,6 @@ public class SingleJoinExample {
ageSpout.feed(new Values(i, i + 20));
}
-
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology("joinTopology", conf, FlinkTopology.createTopology(builder));
cluster.shutdown();
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
index da2e641..6157e2c 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
@@ -18,13 +18,14 @@
package org.apache.flink.storm.print;
-import org.apache.storm.Config;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
+
+import org.apache.storm.Config;
import org.apache.storm.starter.bolt.PrinterBolt;
import org.apache.storm.starter.spout.TwitterSampleSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
import java.util.Arrays;
@@ -32,15 +33,15 @@ import java.util.Arrays;
* Prints incoming tweets. Tweets can be filtered by keywords.
*/
public class PrintSampleStream {
-
+
public static void main(String[] args) throws Exception {
-
+
if (args.length < 4) {
System.err.println(
"Usage: PrintSampleStream <consumer-key> <consumer-secret> <access-token> <access-token-secret>");
return;
}
-
+
String consumerKey = args[0];
String consumerSecret = args[1];
String accessToken = args[2];
@@ -56,7 +57,6 @@ public class PrintSampleStream {
builder.setBolt("print", new PrinterBolt())
.shuffleGrouping("twitter");
-
Config conf = new Config();
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
index 02131fc..c5bb5c3 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.split;
import org.apache.flink.api.common.functions.MapFunction;
@@ -33,15 +34,15 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Implements a simple example with two declared output streams for the embedded spout.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>handle multiple output stream of a spout</li>
* <li>accessing each stream by .split(...) and .select(...)</li>
* <li>strip wrapper data type SplitStreamType for further processing in Flink</li>
* </ul>
- * <p>
- * This example would work the same way for multiple bolt output streams.
+ *
+ * <p>This example would work the same way for multiple bolt output streams.
*/
public class SpoutSplitExample {
@@ -94,7 +95,7 @@ public class SpoutSplitExample {
/**
* Same as {@link VerifyAndEnrichBolt}.
*/
- public final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
+ public static final class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
private static final long serialVersionUID = 5213888269197438892L;
private final Tuple2<String, Integer> out;
private final boolean isEven;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
index 5fbe0a7..afec47f 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
@@ -15,10 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.storm.split.operators;
-import java.util.Map;
-import java.util.Random;
+package org.apache.flink.storm.split.operators;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -27,6 +25,12 @@ import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * A Spout implementation that emits random numbers, optionally splitting them into odd/even streams.
+ */
public class RandomSpout extends BaseRichSpout {
private static final long serialVersionUID = -3978554318742509334L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
index 1ad9a6c..a39ec9c 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
@@ -15,9 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.storm.split.operators;
-import java.util.Map;
+package org.apache.flink.storm.split.operators;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -27,6 +26,12 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import java.util.Map;
+
+/**
+ * Verifies that incoming numbers are either even or odd, controlled by the constructor argument. Emitted tuples are
+ * enriched with a new string field containing either "even" or "odd", based on the number's parity.
+ */
public class VerifyAndEnrichBolt extends BaseRichBolt {
private static final long serialVersionUID = -7277395570966328721L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
index 2cb346a..5ae8cfb 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
@@ -50,7 +50,7 @@ public abstract class AbstractBoltSink implements IRichBolt {
}
}
- protected abstract void prepareSimple(final Map<?, ?> stormConf, final TopologyContext context);
+ protected abstract void prepareSimple(Map<?, ?> stormConf, TopologyContext context);
@Override
public final void execute(final Tuple input) {
@@ -60,7 +60,7 @@ public abstract class AbstractBoltSink implements IRichBolt {
this.writeExternal(this.lineBuilder.toString());
}
- protected abstract void writeExternal(final String line);
+ protected abstract void writeExternal(String line);
@Override
public void cleanup() {/* nothing to do */}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
index 29df23e..caefd56 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
@@ -32,7 +32,7 @@ import java.util.Map;
public abstract class AbstractLineSpout implements IRichSpout {
private static final long serialVersionUID = 8876828403487806771L;
- public final static String ATTRIBUTE_LINE = "line";
+ public static final String ATTRIBUTE_LINE = "line";
protected SpoutOutputCollector collector;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
index 0a295e7..0533b09 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
@@ -33,7 +33,7 @@ import java.util.Map;
public class FileSpout extends AbstractLineSpout {
private static final long serialVersionUID = -6996907090003590436L;
- public final static String INPUT_FILE_PATH = "input.path";
+ public static final String INPUT_FILE_PATH = "input.path";
protected String path = null;
protected BufferedReader reader;
@@ -50,8 +50,8 @@ public class FileSpout extends AbstractLineSpout {
super.open(conf, context, collector);
Object configuredPath = conf.get(INPUT_FILE_PATH);
- if(configuredPath != null) {
- this.path = (String)configuredPath;
+ if (configuredPath != null) {
+ this.path = (String) configuredPath;
}
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
index 48349c2..e4f39ab 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.util;
import org.apache.storm.spout.SpoutOutputCollector;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
index fe28afc..a0f933f 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
@@ -22,16 +22,20 @@ import org.apache.storm.tuple.Tuple;
import java.io.Serializable;
+/**
+ * Interface that is used to convert Storm {@link Tuple Tuples} to a string before writing them out to a file or to the
+ * console.
+ */
public interface OutputFormatter extends Serializable {
/**
* Converts a Storm {@link Tuple} to a string. This method is used for formatting the output tuples before writing
* them out to a file or to the console.
- *
+ *
* @param input
* The tuple to be formatted
* @return The string result of the formatting
*/
- public String format(Tuple input);
+ String format(Tuple input);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
index 323fb53..bf30cd2 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
@@ -20,13 +20,17 @@ package org.apache.flink.storm.util;
import org.apache.storm.tuple.Tuple;
+/**
+ * Simple {@link OutputFormatter} implementation to convert {@link Tuple Tuples} with a size of 1 by returning the
+ * result of {@link Object#toString()} for the first field.
+ */
public class SimpleOutputFormatter implements OutputFormatter {
private static final long serialVersionUID = 6349573860144270338L;
/**
* Converts a Storm {@link Tuple} with 1 field to a string by retrieving the value of that field. This method is
* used for formatting raw outputs wrapped in tuples, before writing them out to a file or to the console.
- *
+ *
* @param input
* The tuple to be formatted
* @return The string result of the formatting
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
index 11d23cd..42189a7 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
@@ -20,6 +20,10 @@ package org.apache.flink.storm.util;
import org.apache.storm.tuple.Tuple;
+/**
+ * {@link OutputFormatter} implementation that converts {@link Tuple Tuples} of arbitrary size to a string. For a given
+ * tuple the output is <code>(field1,field2,...,fieldX)</code>.
+ */
public class TupleOutputFormatter implements OutputFormatter {
private static final long serialVersionUID = -599665757723851761L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
index 4620d9d..6f7addf 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
@@ -17,7 +17,6 @@
package org.apache.flink.storm.wordcount;
-import org.apache.storm.topology.IRichBolt;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
@@ -26,16 +25,18 @@ import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.storm.topology.IRichBolt;
+
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>WordCount <text path> <result path></code><br>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>use a Bolt within a Flink Streaming program.</li>
* </ul>
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
index eefbf78..125a044 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
@@ -17,7 +17,6 @@
package org.apache.flink.storm.wordcount;
-import org.apache.storm.topology.IRichBolt;
import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -32,17 +31,19 @@ import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.storm.topology.IRichBolt;
+
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}. In contrast to {@link BoltTokenizerWordCount}
* the tokenizer's input is a POJO type and the single field is accessed by name.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>WordCount <text path> <result path></code><br>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>how to access attributes by name within a Bolt for POJO type input streams
* </ul>
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
index 98f7f96..f469bab 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
@@ -17,8 +17,6 @@
package org.apache.flink.storm.wordcount;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.tuple.Fields;
import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
@@ -34,17 +32,20 @@ import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.Fields;
+
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}. In contrast to {@link BoltTokenizerWordCount}
* the tokenizer's input is a {@link Tuple} type and the single field is accessed by name.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>WordCount <text path> <result path></code><br>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>how to access attributes by name within a Bolt for {@link Tuple} type input streams
* </ul>
@@ -120,7 +121,7 @@ public class BoltTokenizerWordCountWithNames {
private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutionEnvironment env) {
if (fileOutput) {
// read the text file from given input path
- TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor
+ TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>) TypeExtractor
.getForObject(new Tuple1<String>(""));
return env.createInput(new TupleCsvInputFormat<Tuple1<String>>(new Path(
textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
index 683a3b5..f0cfd7a 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
@@ -17,9 +17,6 @@
package org.apache.flink.storm.wordcount;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.utils.Utils;
-
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -31,16 +28,19 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.utils.Utils;
+
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The used data source is a {@link IRichSpout Spout}.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>WordCount <text path> <result path></code><br>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>use a Spout within a Flink Streaming program.</li>
* </ul>
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
index ee880ba..82c8ae3 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
@@ -17,35 +17,35 @@
package org.apache.flink.storm.wordcount;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
* same way as to a Storm {@link LocalCluster}.
- * <p>
- * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
+ *
+ * <p>This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
* via Flink command line clients (ie, bin/flink).
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>WordCount <text path> <result path></code><br>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCount <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>run a regular Storm program locally on Flink</li>
* </ul>
*/
public class WordCountLocal {
- public final static String topologyId = "Storm WordCount";
+ private static final String topologyId = "Storm WordCount";
// *************************************************************************
// PROGRAM
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
index ab423cf..b960b79 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
@@ -17,36 +17,36 @@
package org.apache.flink.storm.wordcount;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopology;
-
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
* same way as to a Storm {@link LocalCluster}. In contrast to {@link WordCountLocal} all bolts access the field of
* input tuples by name instead of index.
- * <p>
- * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
+ *
+ * <p>This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
* via Flink command line clients (ie, bin/flink).
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>WordCountLocalByName <text path> <result path></code><br>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCountLocalByName <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>run a regular Storm program locally on Flink
* </ul>
*/
public class WordCountLocalByName {
- public final static String topologyId = "Storm WordCountName";
+ private static final String topologyId = "Storm WordCountName";
// *************************************************************************
// PROGRAM
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
index 5c99f93..8dff6d7 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
@@ -17,6 +17,10 @@
package org.apache.flink.storm.wordcount;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkClient;
+import org.apache.flink.storm.api.FlinkTopology;
+
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
@@ -26,31 +30,27 @@ import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.storm.api.FlinkClient;
-import org.apache.flink.storm.api.FlinkTopology;
-
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
* same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can be local or remote.
- * <p>
- * This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via
+ *
+ * <p>This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via
* Flink command line clients (ie, bin/flink).
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>WordCountRemoteByClient <text path> <result path></code><br>
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCountRemoteByClient <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>submit a regular Storm program to a local or remote Flink cluster.</li>
* </ul>
*/
public class WordCountRemoteByClient {
- public final static String topologyId = "Storm WordCount";
- private final static String uploadedJarLocation = "WordCount-StormTopology.jar";
+ private static final String topologyId = "Storm WordCount";
+ private static final String uploadedJarLocation = "WordCount-StormTopology.jar";
// *************************************************************************
// PROGRAM
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
index 08ba52a..745ec85 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
@@ -17,35 +17,35 @@
package org.apache.flink.storm.wordcount;
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-
-import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkClient;
import org.apache.flink.storm.api.FlinkSubmitter;
import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
* same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can be local or remote.
- * <p>
- * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>WordCountRemoteBySubmitter <text path> <result path></code><br>
+ *
+ * <p>This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage: <code>WordCountRemoteBySubmitter <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>submit a regular Storm program to a local or remote Flink cluster.</li>
* </ul>
*/
public class WordCountRemoteBySubmitter {
- public final static String topologyId = "Storm WordCount";
+ private static final String topologyId = "Storm WordCount";
// *************************************************************************
// PROGRAM
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
index 8f855b5..8627145 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
@@ -17,10 +17,6 @@
package org.apache.flink.storm.wordcount;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.BoltPrintSink;
@@ -34,27 +30,31 @@ import org.apache.flink.storm.wordcount.operators.BoltTokenizerByName;
import org.apache.flink.storm.wordcount.operators.WordCountFileSpout;
import org.apache.flink.storm.wordcount.operators.WordCountInMemorySpout;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
* fashion. The program is constructed as a regular {@link StormTopology}.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage:
+ *
+ * <p>The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>Usage:
* <code>WordCount[Local|LocalByName|RemoteByClient|RemoteBySubmitter] <text path> <result path></code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
- * <p>
- * This example shows how to:
+ *
+ * <p>This example shows how to:
* <ul>
* <li>how to construct a regular Storm topology as Flink program</li>
* </ul>
*/
public class WordCountTopology {
- public final static String spoutId = "source";
- public final static String tokenierzerId = "tokenizer";
- public final static String counterId = "counter";
- public final static String sinkId = "sink";
- private final static OutputFormatter formatter = new TupleOutputFormatter();
+ private static final String spoutId = "source";
+ private static final String tokenierzerId = "tokenizer";
+ private static final String counterId = "counter";
+ private static final String sinkId = "sink";
+ private static final OutputFormatter formatter = new TupleOutputFormatter();
public static TopologyBuilder buildTopology() {
return buildTopology(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
index 4a00869..34fc703 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
@@ -32,8 +32,8 @@ import java.util.Map;
* Implements the word counter that counts the occurrence of each unique word. The bolt takes a pair (input tuple
* schema: {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema:
* {@code <String,Integer>} ).
- * <p>
- * Same as {@link BoltCounterByName}, but accesses input attribute by index (instead of name).
+ *
+ * <p>Same as {@link BoltCounterByName}, but accesses input attribute by index (instead of name).
*/
public class BoltCounter implements IRichBolt {
private static final long serialVersionUID = 399619605462625934L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
index e3e0d58..cd53d50 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
@@ -32,8 +32,8 @@ import java.util.Map;
* Implements the word counter that counts the occurrence of each unique word. The bolt takes a pair (input tuple
* schema: {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema:
* {@code <String,Integer>} ).
- * <p>
- * Same as {@link BoltCounter}, but accesses input attribute by name (instead of index).
+ *
+ * <p>Same as {@link BoltCounter}, but accesses input attribute by name (instead of index).
*/
public class BoltCounterByName implements IRichBolt {
private static final long serialVersionUID = 399619605462625934L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
index cedd90a..41e8a8d 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
@@ -31,8 +31,8 @@ import java.util.Map;
* Implements the string tokenizer that splits sentences into words as a bolt. The bolt takes a line (input tuple
* schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema:
* {@code <String,Integer>}).
- * <p>
- * Same as {@link BoltTokenizerByName}, but accesses input attribute by index (instead of name).
+ *
+ * <p>Same as {@link BoltTokenizerByName}, but accesses input attribute by index (instead of name).
*/
public final class BoltTokenizer implements IRichBolt {
private static final long serialVersionUID = -8589620297208175149L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
index 258d412..dff39eb 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
@@ -31,8 +31,8 @@ import java.util.Map;
* Implements the string tokenizer that splits sentences into words as a bolt. The bolt takes a line (input tuple
* schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema:
* {@code <String,Integer>}).
- * <p>
- * Same as {@link BoltTokenizer}, but accesses input attribute by name (instead of index).
+ *
+ * <p>Same as {@link BoltTokenizer}, but accesses input attribute by name (instead of index).
*/
public final class BoltTokenizerByName implements IRichBolt {
private static final long serialVersionUID = -8589620297208175149L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java
index 3a8fd3a..d63974b 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java
@@ -17,12 +17,15 @@
package org.apache.flink.storm.wordcount.operators;
-import java.io.Serializable;
-
import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import java.io.Serializable;
+
+/**
+ * Input POJOs for WordCount programs.
+ */
public class WordCountDataPojos {
- public static Sentence[] SENTENCES;
+ public static final Sentence[] SENTENCES;
static {
SENTENCES = new Sentence[WordCountData.WORDS.length];
@@ -31,6 +34,9 @@ public class WordCountDataPojos {
}
}
+ /**
+ * Simple POJO containing a string.
+ */
public static class Sentence implements Serializable {
private static final long serialVersionUID = -7336372859203407522L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java
index 16e2ba0..d01d9a2 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java
@@ -20,9 +20,12 @@ package org.apache.flink.storm.wordcount.operators;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
+/**
+ * Input tuples for WordCount programs.
+ */
@SuppressWarnings("unchecked")
public class WordCountDataTuple {
- public static Tuple1<String>[] TUPLES;
+ public static final Tuple1<String>[] TUPLES;
static {
TUPLES = new Tuple1[WordCountData.WORDS.length];
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
index 7bf40c2..be376a9 100644
--- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
@@ -17,11 +17,12 @@
package org.apache.flink.storm.wordcount.operators;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.util.FiniteInMemorySpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+
/**
* Implements a Spout that reads data from {@link WordCountData#WORDS}.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
index 5a37572..358919f 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.storm.exclamation.util.ExclamationData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
+/**
+ * Test for the ExclamationWithBolt example.
+ */
public class ExclamationWithBoltITCase extends StreamingProgramTestBase {
protected String textPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
index c2b0467..61310e8 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.storm.exclamation.util.ExclamationData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
+/**
+ * Test for the ExclamantionWithSpout example.
+ */
public class ExclamationWithSpoutITCase extends StreamingProgramTestBase {
protected String textPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
index 049c881..bc09a3d 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
@@ -22,6 +22,9 @@ import org.apache.flink.storm.exclamation.util.ExclamationData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
+/**
+ * Test for the ExclamationLocal example.
+ */
public class StormExclamationLocalITCase extends StreamingProgramTestBase {
protected String textPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java
index 3c435f9..f700009 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java
@@ -18,6 +18,9 @@
package org.apache.flink.storm.exclamation.util;
+/**
+ * Expected output of Exclamation programs.
+ */
public class ExclamationData {
public static final String TEXT_WITH_EXCLAMATIONS =
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
index b51db2c..83531ba 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
@@ -18,12 +18,16 @@
package org.apache.flink.storm.join;
-import com.google.common.base.Joiner;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import com.google.common.base.Joiner;
+
+/**
+ * Test for the SingleJoin example.
+ */
public class SingleJoinITCase extends StreamingProgramTestBase {
- protected static String expectedOutput[] = {
+ protected static String[] expectedOutput = {
"(male,20)",
"(female,21)",
"(male,22)",
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
index 0fc1ba5..90ee795 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
@@ -15,9 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.storm.split;
-import java.util.Map;
+package org.apache.flink.storm.split;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -27,6 +26,11 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import java.util.Map;
+
+/**
+ * A bolt for splitting an input stream containing numbers based on whether they are even or odd.
+ */
public class SplitBolt extends BaseRichBolt {
private static final long serialVersionUID = -6627606934204267173L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
index 04cfeed..c002840 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.split;
-import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.split.operators.RandomSpout;
import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
import org.apache.flink.storm.util.BoltFileSink;
@@ -25,13 +25,18 @@ import org.apache.flink.storm.util.BoltPrintSink;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.TupleOutputFormatter;
+import org.apache.storm.topology.TopologyBuilder;
+
+/**
+ * A simple topology that splits a stream of numbers based on their parity, and verifies the result.
+ */
public class SplitBoltTopology {
- public final static String spoutId = "randomSource";
- public final static String boltId = "splitBolt";
- public final static String evenVerifierId = "evenVerifier";
- public final static String oddVerifierId = "oddVerifier";
- public final static String sinkId = "sink";
- private final static OutputFormatter formatter = new TupleOutputFormatter();
+ private static final String spoutId = "randomSource";
+ private static final String boltId = "splitBolt";
+ private static final String evenVerifierId = "evenVerifier";
+ private static final String oddVerifierId = "oddVerifier";
+ private static final String sinkId = "sink";
+ private static final OutputFormatter formatter = new TupleOutputFormatter();
public static TopologyBuilder buildTopology() {
final TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
index 4da9708..d53493c 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitITCase.java
@@ -14,19 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.storm.split;
-import java.io.File;
-import java.io.IOException;
+package org.apache.flink.storm.split;
import org.apache.flink.storm.split.SpoutSplitExample.Enrich;
import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Tests for split examples.
+ */
public class SplitITCase extends StreamingMultipleProgramsTestBase {
private String output;
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
index 8671d2e..aa92a95 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.split;
-import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.split.operators.RandomSpout;
import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
import org.apache.flink.storm.util.BoltFileSink;
@@ -25,12 +25,18 @@ import org.apache.flink.storm.util.BoltPrintSink;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.TupleOutputFormatter;
+import org.apache.storm.topology.TopologyBuilder;
+
+/**
+ * A simple topology similar to the {@link SplitBoltTopology}, except that the split streams are generated directly in
+ * a spout.
+ */
public class SplitSpoutTopology {
- public final static String spoutId = "randomSplitSource";
- public final static String evenVerifierId = "evenVerifier";
- public final static String oddVerifierId = "oddVerifier";
- public final static String sinkId = "sink";
- private final static OutputFormatter formatter = new TupleOutputFormatter();
+ private static final String spoutId = "randomSplitSource";
+ private static final String evenVerifierId = "evenVerifier";
+ private static final String oddVerifierId = "oddVerifier";
+ private static final String sinkId = "sink";
+ private static final OutputFormatter formatter = new TupleOutputFormatter();
public static TopologyBuilder buildTopology() {
final TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
index 2cde11e..55c3bd3 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
@@ -14,15 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.split;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+/**
+ * An example using the {@link SplitBoltTopology}.
+ */
public class SplitStreamBoltLocal {
- public final static String topologyId = "Bolt split stream example";
+ private static final String topologyId = "Bolt split stream example";
// *************************************************************************
// PROGRAM
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
index be880d0..da6e574 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
@@ -14,15 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.split;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+
+/**
+ * An example using the {@link SplitSpoutTopology}.
+ */
public class SplitStreamSpoutLocal {
- public final static String topologyId = "Spout split stream example";
+ private static final String topologyId = "Spout split stream example";
// *************************************************************************
// PROGRAM
http://git-wip-us.apache.org/repos/asf/flink/blob/2a2d984f/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index 581f7c1..c861c9e 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -15,11 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.storm.tests;
-import org.apache.storm.Config;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
@@ -28,6 +26,10 @@ import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.util.MathUtils;
+
+import org.apache.storm.Config;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
import org.junit.Assert;
import java.util.ArrayList;
@@ -41,10 +43,10 @@ import java.util.List;
*/
public class StormFieldsGroupingITCase extends StreamingProgramTestBase {
- private final static String topologyId = "FieldsGrouping Test";
- private final static String spoutId = "spout";
- private final static String boltId = "bolt";
- private final static String sinkId = "sink";
+ private static final String topologyId = "FieldsGrouping Test";
+ private static final String spoutId = "spout";
+ private static final String boltId = "bolt";
+ private static final String sinkId = "sink";
private String resultPath;
@Override
@@ -62,19 +64,19 @@ public class StormFieldsGroupingITCase extends StreamingProgramTestBase {
readAllResultLines(actualResults, resultPath, new String[0], false);
//remove potential operator id prefix
- for(int i = 0; i < actualResults.size(); ++i) {
+ for (int i = 0; i < actualResults.size(); ++i) {
String s = actualResults.get(i);
- if(s.contains(">")) {
+ if (s.contains(">")) {
s = s.substring(s.indexOf(">") + 2);
actualResults.set(i, s);
}
}
- Assert.assertEquals(expectedResults.size(),actualResults.size());
+ Assert.assertEquals(expectedResults.size(), actualResults.size());
Collections.sort(actualResults);
Collections.sort(expectedResults);
System.out.println(actualResults);
- for(int i=0; i< actualResults.size(); ++i) {
+ for (int i = 0; i < actualResults.size(); ++i) {
//compare against actual results with removed prefex (as it depends e.g. on the hash function used)
Assert.assertEquals(expectedResults.get(i), actualResults.get(i));
}