You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:13 UTC
[23/27] flink git commit: [storm-compat] Moved Storm-compatibility to
flink-contrib and split flink-contrib into small sub-projects
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
new file mode 100644
index 0000000..bfc3135
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import java.io.Serializable;
+
+import backtype.storm.tuple.Tuple;
+
+public interface OutputFormatter extends Serializable {
+
+ public String format(Tuple input);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
new file mode 100644
index 0000000..a9d72d9
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Tuple;
+
+public class SimpleOutputFormatter implements OutputFormatter {
+ private static final long serialVersionUID = 6349573860144270338L;
+
+ @Override
+ public String format(final Tuple input) {
+ return input.getValue(0).toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
new file mode 100644
index 0000000..a92bc6a
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.task.TopologyContext;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implements a sink that write the received data to the given file (as a result of {@code Object.toString()} for each
+ * attribute).
+ */
+public final class StormBoltFileSink extends AbstractStormBoltSink {
+ private static final long serialVersionUID = 2014027288631273666L;
+
+ private final String path;
+ private BufferedWriter writer;
+
+ public StormBoltFileSink(final String path, final OutputFormatter formatter) {
+ super(formatter);
+ this.path = path;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepareSimple(final Map stormConf, final TopologyContext context) {
+ try {
+ this.writer = new BufferedWriter(new FileWriter(this.path));
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(final String line) {
+ try {
+ this.writer.write(line + "\n");
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ if (this.writer != null) {
+ try {
+ this.writer.close();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
new file mode 100644
index 0000000..3bf49d0
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.task.TopologyContext;
+
+import java.util.Map;
+
+/**
+ * Implements a sink that prints the received data to {@code stdout}.
+ */
+public final class StormBoltPrintSink extends AbstractStormBoltSink {
+ private static final long serialVersionUID = -6650011223001009519L;
+
+ public StormBoltPrintSink(OutputFormatter formatter) {
+ super(formatter);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepareSimple(final Map stormConf, final TopologyContext context) {
+ /* nothing to do */
+ }
+
+ @Override
+ public void writeExternal(final String line) {
+ System.out.println(line);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
new file mode 100644
index 0000000..c38b599
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implements a Storm Spout that reads data from a given local file.
+ */
+public final class StormFileSpout extends AbstractStormSpout {
+ private static final long serialVersionUID = -6996907090003590436L;
+
+ private final String path;
+ private BufferedReader reader;
+
+ public StormFileSpout(final String path) {
+ this.path = path;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ super.open(conf, context, collector);
+ try {
+ this.reader = new BufferedReader(new FileReader(this.path));
+ } catch (final FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.reader != null) {
+ try {
+ this.reader.close();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ String line;
+ try {
+ line = this.reader.readLine();
+ if (line != null) {
+ this.collector.emit(new Values(line));
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
new file mode 100644
index 0000000..3e6081c
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Values;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+/**
+ * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
+ */
+public final class StormInMemorySpout extends AbstractStormSpout {
+ private static final long serialVersionUID = -4008858647468647019L;
+
+ private String[] source;
+ private int counter = 0;
+
+ public StormInMemorySpout(String[] source) {
+ this.source = source;
+ }
+
+ @Override
+ public void nextTuple() {
+ if (this.counter < WordCountData.WORDS.length) {
+ this.collector.emit(new Values(WordCountData.WORDS[this.counter++]));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
new file mode 100644
index 0000000..6419ee3
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Tuple;
+
+public class TupleOutputFormatter implements OutputFormatter {
+ private static final long serialVersionUID = -599665757723851761L;
+
+ @Override
+ public String format(final Tuple input) {
+ final StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("(");
+ for (final Object attribute : input.getValues()) {
+ stringBuilder.append(attribute);
+ stringBuilder.append(",");
+ }
+ stringBuilder.replace(stringBuilder.length() - 1, stringBuilder.length(), ")");
+ return stringBuilder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
new file mode 100644
index 0000000..606a3ce
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm bolt within a Flink Streaming program.
+ * </ul>
+ */
+public class BoltTokenizerWordCount {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<String> text = getTextDataStream(env);
+
+ final DataStream<Tuple2<String, Integer>> counts = text
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // this is done by a Storm bolt that is wrapped accordingly
+ .transform("StormBoltTokenizer",
+ TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+ new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer()))
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0).sum(1);
+
+ // emit result
+ if (fileOutput) {
+ counts.writeAsText(outputPath);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with Storm bolt tokenizer");
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing BoltTokenizerWordCount example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: BoltTokenizerWordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ }
+
+ return env.fromElements(WordCountData.WORDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
new file mode 100644
index 0000000..0ae51c6
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.topology.IRichSpout;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.util.StormFileSpout;
+import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
+import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The used data source is a Storm {@link IRichSpout bolt}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm bolt within a Flink Streaming program.
+ * </ul>
+ */
+public class SpoutSourceWordCount {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<String> text = getTextDataStream(env);
+
+ final DataStream<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new Tokenizer())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0).sum(1);
+
+ // emit result
+ if (fileOutput) {
+ counts.writeAsText(outputPath);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with Storm spout source");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a user-defined FlatMapFunction. The function
+ * takes a line (String) and splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ */
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(final String value, final Collector<Tuple2<String, Integer>> out) throws Exception {
+ // normalize and split the line
+ final String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (final String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: SpoutSourceWordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing SpoutSourceWordCount example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: SpoutSourceWordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ final String[] tokens = textPath.split(":");
+ final String localFile = tokens[tokens.length - 1];
+ return env.addSource(
+ new StormFiniteSpoutWrapper<String>(new StormFileSpout(localFile), true),
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
+ }
+
+ return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
+ TypeExtractor.getForClass(String.class));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
new file mode 100644
index 0000000..7b4f471
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.Utils;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
+ * same way as to a Storm {@link LocalCluster}.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
+ * via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink
+ * </ul>
+ */
+public class StormWordCountLocal {
+ public final static String topologyId = "Streaming WordCount";
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!WordCountTopology.parseParameters(args)) {
+ return;
+ }
+
+ // build Topology the Storm way
+ final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
+
+ // execute program locally
+ final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+ cluster.submitTopology(topologyId, null, builder.createTopology());
+
+ Utils.sleep(5 * 1000);
+
+ // TODO kill does no do anything so far
+ cluster.killTopology(topologyId);
+ cluster.shutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
new file mode 100644
index 0000000..9e56c14
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkClient;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
+ * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink cluster can be local or remote.
+ * <p/>
+ * This example shows how to submit the program via Java, thus it cannot be used to submit a {@link StormTopology} via
+ * Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.
+ * </ul>
+ */
+public class StormWordCountRemoteByClient {
+ public final static String topologyId = "Streaming WordCount";
+ private final static String uploadedJarLocation = "target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar";
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws AlreadyAliveException, InvalidTopologyException,
+ NotAliveException {
+
+ if (!WordCountTopology.parseParameters(args)) {
+ return;
+ }
+
+ // build Topology the Storm way
+ final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
+
+ // execute program on Flink cluster
+ final Config conf = new Config();
+ // can be changed to remote address
+ conf.put(Config.NIMBUS_HOST, "localhost");
+ // use default flink jobmanger.rpc.port
+ conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
+
+ final FlinkClient cluster = FlinkClient.getConfiguredClient(conf);
+ cluster.submitTopology(topologyId, uploadedJarLocation, builder.createTopology());
+
+ Utils.sleep(5 * 1000);
+
+ cluster.killTopology(topologyId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
new file mode 100644
index 0000000..a1fb79d
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkClient;
+import org.apache.flink.stormcompatibility.api.FlinkSubmitter;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
+ * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink cluster can be local or remote.
+ * <p/>
+ * This example shows how to submit the program via Java as well as Flink's command line client (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.
+ * </ul>
+ */
+public class StormWordCountRemoteBySubmitter {
+ public final static String topologyId = "Streaming WordCount";
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!WordCountTopology.parseParameters(args)) {
+ return;
+ }
+
+ // build Topology the Storm way
+ final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
+
+ // execute program on Flink cluster
+ final Config conf = new Config();
+ // We can set Jobmanager host/port values manually or leave them blank
+ // if not set and
+ // - executed within Java, default values "localhost" and "6123" are set by FlinkSubmitter
+ // - executed via bin/flink values from flink-conf.yaml are set by FlinkSubmitter.
+ // conf.put(Config.NIMBUS_HOST, "localhost");
+ // conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123));
+
+ // The user jar file must be specified via JVM argument if executed via Java.
+ // => -Dstorm.jar=target/flink-storm-examples-0.9-SNAPSHOT-WordCountStorm.jar
+ // If bin/flink is used, the jar file is detected automatically.
+ FlinkSubmitter.submitTopology(topologyId, conf, builder.createTopology());
+
+ Thread.sleep(5 * 1000);
+
+ FlinkClient.getConfiguredClient(conf).killTopology(topologyId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
new file mode 100644
index 0000000..d39a526
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.stormcompatibility.util.OutputFormatter;
+import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
+import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
+import org.apache.flink.stormcompatibility.util.StormFileSpout;
+import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
+import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to construct a regular Storm topology as Flink program
+ * </ul>
+ */
+public class WordCountTopology {
+ public final static String spoutId = "source";
+ public final static String tokenierzerId = "tokenizer";
+ public final static String counterId = "counter";
+ public final static String sinkId = "sink";
+ private final static OutputFormatter formatter = new TupleOutputFormatter();
+
+ public static FlinkTopologyBuilder buildTopology() {
+
+ final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+ // get input data
+ if (fileInputOutput) {
+ // read the text file from given input path
+ final String[] tokens = textPath.split(":");
+ final String inputFile = tokens[tokens.length - 1];
+ builder.setSpout(spoutId, new StormFileSpout(inputFile));
+ } else {
+ builder.setSpout(spoutId, new StormInMemorySpout(WordCountData.WORDS));
+ }
+
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ builder.setBolt(tokenierzerId, new StormBoltTokenizer(), 4).shuffleGrouping(spoutId);
+ // group by the tuple field "0" and sum up tuple field "1"
+ builder.setBolt(counterId, new StormBoltCounter(), 4).fieldsGrouping(tokenierzerId,
+ new Fields(StormBoltTokenizer.ATTRIBUTE_WORD));
+
+ // emit result
+ if (fileInputOutput) {
+ // read the text file from given input path
+ final String[] tokens = outputPath.split(":");
+ final String outputFile = tokens[tokens.length - 1];
+ builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)).shuffleGrouping(counterId);
+ } else {
+ builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4).shuffleGrouping(counterId);
+ }
+
+ return builder;
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileInputOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileInputOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: StormWordCount* <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing StormWordCount* example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: StormWordCount* <text path> <result path>");
+ }
+
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
new file mode 100644
index 0000000..214ca5e
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements the word counter that the occurrence of each unique word. The bolt takes a pair (input tuple schema:
+ * {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema:
+ * {@code <String,Integer>} ).
+ */
+public class StormBoltCounter implements IRichBolt {
+ private static final long serialVersionUID = 399619605462625934L;
+
+ public static final String ATTRIBUTE_WORD = "word";
+ public static final String ATTRIBUTE_COUNT = "count";
+
+ private final HashMap<String, Count> counts = new HashMap<String, Count>();
+ private OutputCollector collector;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(final Tuple input) {
+ final String word = input.getString(StormBoltTokenizer.ATTRIBUTE_WORD_INDEX);
+
+ Count currentCount = this.counts.get(word);
+ if (currentCount == null) {
+ currentCount = new Count();
+ this.counts.put(word, currentCount);
+ }
+ currentCount.count += input.getInteger(StormBoltTokenizer.ATTRIBUTE_COUNT_INDEX);
+
+ this.collector.emit(new Values(word, currentCount.count));
+ }
+
+ @Override
+ public void cleanup() {/* nothing to do */}
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ /**
+ * A counter helper to emit immutable tuples to the given stormCollector and avoid unnecessary object
+ * creating/deletion.
+ *
+ * @author mjsax
+ */
+ private static final class Count {
+ public int count;
+
+ public Count() {/* nothing to do */}
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
new file mode 100644
index 0000000..96bd87c
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ * Implements the string tokenizer that splits sentences into words as a Storm bolt. The bolt takes a line (input tuple
+ * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema:
+ * {@code <String,Integer>}).
+ */
+public final class StormBoltTokenizer implements IRichBolt {
+ private static final long serialVersionUID = -8589620297208175149L;
+
+ public static final String ATTRIBUTE_WORD = "word";
+ public static final String ATTRIBUTE_COUNT = "count";
+
+ public static final int ATTRIBUTE_WORD_INDEX = 0;
+ public static final int ATTRIBUTE_COUNT_INDEX = 1;
+
+ private OutputCollector collector;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(final Tuple input) {
+ final String[] tokens = input.getString(0).toLowerCase().split("\\W+");
+
+ for (final String token : tokens) {
+ if (token.length() > 0) {
+ this.collector.emit(new Values(token, 1));
+ }
+ }
+ }
+
+ @Override
+ public void cleanup() {/* nothing to do */}
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
new file mode 100644
index 0000000..68f1216
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.RebalanceOptions;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.generated.TopologyInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+
+import java.util.Map;
+
+/**
+ * {@link FlinkTestCluster} mimics a Storm {@link LocalCluster} for ITCases via a {@link TestStreamEnvironment}.
+ */
+public class FlinkTestCluster extends FlinkLocalCluster {
+
+ @Override
+ public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
+ throws Exception {
+ this.submitTopologyWithOpts(topologyName, conf, topology, null);
+ }
+
+ @Override
+ public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
+ final SubmitOptions submitOpts)
+ throws Exception {
+ final TestStreamEnvironment env = (TestStreamEnvironment) StreamExecutionEnvironment.getExecutionEnvironment();
+ env.start(topology.getStreamGraph().getJobGraph(topologyName));
+ }
+
+ @Override
+ public void killTopology(final String topologyName) {
+ }
+
+ @Override
+ public void killTopologyWithOpts(final String name, final KillOptions options) {
+ }
+
+ @Override
+ public void activate(final String topologyName) {
+ }
+
+ @Override
+ public void deactivate(final String topologyName) {
+ }
+
+ @Override
+ public void rebalance(final String name, final RebalanceOptions options) {
+ }
+
+ @Override
+ public void shutdown() {
+ final TestStreamEnvironment env = (TestStreamEnvironment) StreamExecutionEnvironment.getExecutionEnvironment();
+ try {
+ env.shutdown();
+ } catch (final InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public String getTopologyConf(final String id) {
+ return null;
+ }
+
+ @Override
+ public StormTopology getTopology(final String id) {
+ return null;
+ }
+
+ @Override
+ public ClusterSummary getClusterInfo() {
+ return null;
+ }
+
+ @Override
+ public TopologyInfo getTopologyInfo(final String id) {
+ return null;
+ }
+
+ @Override
+ public Map<?, ?> getState() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
new file mode 100644
index 0000000..75dd5fc
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormBoltExclamationITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.excamation.StormBoltExclamation;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormBoltExclamationITCase extends StreamingProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ StormBoltExclamation.main(new String[]{this.textPath, this.resultPath});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
new file mode 100644
index 0000000..d6bcf30
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormExclamationLocalITCase.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
+import org.apache.flink.stormcompatibility.excamation.StormExclamationLocal;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormExclamationLocalITCase extends StreamingProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ FlinkLocalCluster.initialize(new FlinkTestCluster());
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ StormExclamationLocal.main(new String[]{this.textPath, this.resultPath});
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
new file mode 100644
index 0000000..2b08b4b
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/StormSpoutExclamationITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation;
+
+import org.apache.flink.stormcompatibility.excamation.StormSpoutExclamation;
+import org.apache.flink.stormcompatibility.exclamation.util.ExclamationData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormSpoutExclamationITCase extends StreamingProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ StormSpoutExclamation.main(new String[]{this.textPath, this.resultPath});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java
new file mode 100644
index 0000000..8b823b5
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/exclamation/util/ExclamationData.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.exclamation.util;
+
+public class ExclamationData {
+
+ public static final String TEXT_WITH_EXCLAMATIONS =
+ "Goethe - Faust: Der Tragoedie erster Teil!!!!!!\n"
+ + "Prolog im Himmel.!!!!!!\n"
+ + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei!!!!!!\n"
+ + "Erzengel treten vor.!!!!!!\n"
+ + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,!!!!!!\n"
+ + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick!!!!!!\n"
+ + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich!!!!!!\n"
+ + "hohen Werke Sind herrlich wie am ersten Tag.!!!!!!\n"
+ + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde!!!!!!\n"
+ + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es!!!!!!\n"
+ + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und!!!!!!\n"
+ + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.!!!!!!\n"
+ + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land!!!!!!\n"
+ + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.!!!!!!\n"
+ + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch!!!!!!\n"
+ + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.!!!!!!\n"
+ + "ZU DREI: Der Anblick gibt den Engeln Staerke, Da keiner dich ergruenden!!!!!!\n"
+ + "mag, Und alle deine hohen Werke Sind herrlich wie am ersten Tag.!!!!!!\n"
+ + "MEPHISTOPHELES: Da du, o Herr, dich einmal wieder nahst Und fragst, wie!!!!!!\n"
+ + "alles sich bei uns befinde, Und du mich sonst gewoehnlich gerne sahst, So!!!!!!\n"
+ + "siehst du mich auch unter dem Gesinde. Verzeih, ich kann nicht hohe Worte!!!!!!\n"
+ + "machen, Und wenn mich auch der ganze Kreis verhoehnt; Mein Pathos braechte!!!!!!\n"
+ + "dich gewiss zum Lachen, Haettst du dir nicht das Lachen abgewoehnt. Von!!!!!!\n"
+ + "Sonn' und Welten weiss ich nichts zu sagen, Ich sehe nur, wie sich die!!!!!!\n"
+ + "Menschen plagen. Der kleine Gott der Welt bleibt stets von gleichem!!!!!!\n"
+ + "Schlag, Und ist so wunderlich als wie am ersten Tag. Ein wenig besser!!!!!!\n"
+ + "wuerd er leben, Haettst du ihm nicht den Schein des Himmelslichts gegeben;!!!!!!\n"
+ + "Er nennt's Vernunft und braucht's allein, Nur tierischer als jedes Tier!!!!!!\n"
+ + "zu sein. Er scheint mir, mit Verlaub von euer Gnaden, Wie eine der!!!!!!\n"
+ + "langbeinigen Zikaden, Die immer fliegt und fliegend springt Und gleich im!!!!!!\n"
+ + "Gras ihr altes Liedchen singt; Und laeg er nur noch immer in dem Grase! In!!!!!!\n"
+ + "jeden Quark begraebt er seine Nase.!!!!!!\n"
+ + "DER HERR: Hast du mir weiter nichts zu sagen? Kommst du nur immer!!!!!!\n"
+ + "anzuklagen? Ist auf der Erde ewig dir nichts recht?!!!!!!\n"
+ + "MEPHISTOPHELES: Nein Herr! ich find es dort, wie immer, herzlich!!!!!!\n"
+ + "schlecht. Die Menschen dauern mich in ihren Jammertagen, Ich mag sogar!!!!!!\n"
+ + "die armen selbst nicht plagen.!!!!!!\n" + "DER HERR: Kennst du den Faust?!!!!!!\n"
+ + "MEPHISTOPHELES: Den Doktor?!!!!!!\n"
+ + "DER HERR: Meinen Knecht!!!!!!!\n"
+ + "MEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre Weise. Nicht irdisch!!!!!!\n"
+ + "ist des Toren Trank noch Speise. Ihn treibt die Gaerung in die Ferne, Er!!!!!!\n"
+ + "ist sich seiner Tollheit halb bewusst; Vom Himmel fordert er die schoensten!!!!!!\n"
+ + "Sterne Und von der Erde jede hoechste Lust, Und alle Naeh und alle Ferne!!!!!!\n"
+ + "Befriedigt nicht die tiefbewegte Brust.!!!!!!\n"
+ + "DER HERR: Wenn er mir auch nur verworren dient, So werd ich ihn bald in!!!!!!\n"
+ + "die Klarheit fuehren. Weiss doch der Gaertner, wenn das Baeumchen gruent, Das!!!!!!\n"
+ + "Bluet und Frucht die kuenft'gen Jahre zieren.!!!!!!\n"
+ + "MEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch verlieren! Wenn Ihr!!!!!!\n"
+ + "mir die Erlaubnis gebt, Ihn meine Strasse sacht zu fuehren.!!!!!!\n"
+ + "DER HERR: Solang er auf der Erde lebt, So lange sei dir's nicht verboten,!!!!!!\n"
+ + "Es irrt der Mensch so lang er strebt.!!!!!!\n"
+ + "MEPHISTOPHELES: Da dank ich Euch; denn mit den Toten Hab ich mich niemals!!!!!!\n"
+ + "gern befangen. Am meisten lieb ich mir die vollen, frischen Wangen. Fuer!!!!!!\n"
+ + "einem Leichnam bin ich nicht zu Haus; Mir geht es wie der Katze mit der Maus.!!!!!!\n"
+ + "DER HERR: Nun gut, es sei dir ueberlassen! Zieh diesen Geist von seinem!!!!!!\n"
+ + "Urquell ab, Und fuehr ihn, kannst du ihn erfassen, Auf deinem Wege mit!!!!!!\n"
+ + "herab, Und steh beschaemt, wenn du bekennen musst: Ein guter Mensch, in!!!!!!\n"
+ + "seinem dunklen Drange, Ist sich des rechten Weges wohl bewusst.!!!!!!\n"
+ + "MEPHISTOPHELES: Schon gut! nur dauert es nicht lange. Mir ist fuer meine!!!!!!\n"
+ + "Wette gar nicht bange. Wenn ich zu meinem Zweck gelange, Erlaubt Ihr mir!!!!!!\n"
+ + "Triumph aus voller Brust. Staub soll er fressen, und mit Lust, Wie meine!!!!!!\n"
+ + "Muhme, die beruehmte Schlange.!!!!!!\n"
+ + "DER HERR: Du darfst auch da nur frei erscheinen; Ich habe deinesgleichen!!!!!!\n"
+ + "nie gehasst. Von allen Geistern, die verneinen, ist mir der Schalk am!!!!!!\n"
+ + "wenigsten zur Last. Des Menschen Taetigkeit kann allzu leicht erschlaffen,!!!!!!\n"
+ + "er liebt sich bald die unbedingte Ruh; Drum geb ich gern ihm den Gesellen!!!!!!\n"
+ + "zu, Der reizt und wirkt und muss als Teufel schaffen. Doch ihr, die echten!!!!!!\n"
+ + "Goettersoehne, Erfreut euch der lebendig reichen Schoene! Das Werdende, das!!!!!!\n"
+ + "ewig wirkt und lebt, Umfass euch mit der Liebe holden Schranken, Und was!!!!!!\n"
+ + "in schwankender Erscheinung schwebt, Befestigt mit dauernden Gedanken!!!!!!!\n"
+ + "(Der Himmel schliesst, die Erzengel verteilen sich.)!!!!!!\n"
+ + "MEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich den Alten gern, Und!!!!!!\n"
+ + "huete mich, mit ihm zu brechen. Es ist gar huebsch von einem grossen Herrn,!!!!!!\n"
+ + "So menschlich mit dem Teufel selbst zu sprechen.!!!!!!";
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
new file mode 100644
index 0000000..9228474
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ BoltTokenizerWordCount.main(new String[]{this.textPath, this.resultPath});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
new file mode 100644
index 0000000..9d7b869
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCountITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class SpoutSourceWordCountITCase extends StreamingProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ SpoutSourceWordCount.main(new String[]{this.textPath, this.resultPath});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
new file mode 100644
index 0000000..2427818
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalITCase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormWordCountLocalITCase extends StreamingProgramTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ FlinkLocalCluster.initialize(new FlinkTestCluster());
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ StormWordCountLocal.main(new String[]{this.textPath, this.resultPath});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-storm-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/pom.xml b/flink-contrib/flink-storm-compatibility/pom.xml
new file mode 100644
index 0000000..e50825b
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-contrib-parent</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-storm-compatibility-parent</artifactId>
+ <name>flink-storm-compatibility</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-storm-compatibility-core</module>
+ <module>flink-storm-compatibility-examples</module>
+ </modules>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml
new file mode 100644
index 0000000..49a9479
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-contrib-parent</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-streaming-contrib</artifactId>
+ <name>flink-streaming-contrib</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/.hidden b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/.hidden
new file mode 100644
index 0000000..e69de29