You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:31 UTC
[06/15] flink git commit: [Storm Compatibility] Maven module
restucturing and cleanup - removed storm-parent;
renamed storm-core and storm-examples - updated internal Java package
structure * renamed package "stormcompatibility" to "storm" *
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
new file mode 100644
index 0000000..985cd68
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.excamation.operators.ExclamationBolt;
+
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology} and submitted to
+ * Flink for execution in the same way as to a Storm {@link backtype.storm.LocalCluster}.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot be used to submit a
+ * {@link backtype.storm.generated.StormTopology} via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>ExclamationLocal <text path> <result path></code><br/>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink</li>
+ * </ul>
+ */
+public class ExclamationLocal {
+
+ public final static String topologyId = "Streaming Exclamation";
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!ExclamationTopology.parseParameters(args)) {
+ return;
+ }
+
+ // build Topology the Storm way
+ final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
+
+ // execute program locally
+ Config conf = new Config();
+ conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());
+ final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+ cluster.submitTopology(topologyId, conf, builder.createTopology());
+
+ Utils.sleep(10 * 1000);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
new file mode 100644
index 0000000..70d25a2
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.excamation.operators.ExclamationBolt;
+import org.apache.flink.storm.util.FiniteFileSpout;
+import org.apache.flink.storm.util.FiniteInMemorySpout;
+import org.apache.flink.storm.util.OutputFormatter;
+import org.apache.flink.storm.util.SimpleOutputFormatter;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.BoltPrintSink;
+
+/**
+ * Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] <text path>
+ * <result path></code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>construct a regular Storm topology as Flink program</li>
+ * <li>make use of the FiniteSpout interface</li>
+ * </ul>
+ */
+public class ExclamationTopology {
+
+ public final static String spoutId = "source";
+ public final static String firstBoltId = "exclamation1";
+ public final static String secondBoltId = "exclamation2";
+ public final static String sinkId = "sink";
+ private final static OutputFormatter formatter = new SimpleOutputFormatter();
+
+ public static FlinkTopologyBuilder buildTopology() {
+ final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+ // get input data
+ if (fileInputOutput) {
+ // read the text file from given input path
+ final String[] tokens = textPath.split(":");
+ final String inputFile = tokens[tokens.length - 1];
+ builder.setSpout(spoutId, new FiniteFileSpout(inputFile));
+ } else {
+ builder.setSpout(spoutId, new FiniteInMemorySpout(WordCountData.WORDS));
+ }
+
+ builder.setBolt(firstBoltId, new ExclamationBolt(), 3).shuffleGrouping(spoutId);
+ builder.setBolt(secondBoltId, new ExclamationBolt(), 2).shuffleGrouping(firstBoltId);
+
+ // emit result
+ if (fileInputOutput) {
+ // read the text file from given input path
+ final String[] tokens = outputPath.split(":");
+ final String outputFile = tokens[tokens.length - 1];
+ builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter))
+ .shuffleGrouping(secondBoltId);
+ } else {
+ builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
+ .shuffleGrouping(secondBoltId);
+ }
+
+ return builder;
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileInputOutput = false;
+ private static String textPath;
+ private static String outputPath;
+ private static int exclamationNum = 3;
+
+ static int getExclamation() {
+ return exclamationNum;
+ }
+
+ static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileInputOutput = true;
+ if (args.length == 3) {
+ textPath = args[0];
+ outputPath = args[1];
+ exclamationNum = Integer.parseInt(args[2]);
+ } else {
+ System.err.println("Usage: StormExclamation* <text path> <result path> <number of exclamation marks>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing StormExclamation example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: StormExclamation <text path> <result path> <number of exclamation marks>");
+ }
+
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
new file mode 100644
index 0000000..01ab907
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.excamation.operators.ExclamationBolt;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import backtype.storm.utils.Utils;
+
+/**
+ * Implements the "Exclamation" program that attaches 3+x exclamation marks to every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage:
+ * <code>ExclamationWithmBolt <text path> <result path> <number of exclamation marks></code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData} with x=2.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Bolt within a Flink Streaming program</li>
+ * <li>how to configure a Bolt using StormConfig</li>
+ * </ul>
+ */
+public class ExclamationWithBolt {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // set Storm configuration
+ StormConfig config = new StormConfig();
+ config.put(ExclamationBolt.EXCLAMATION_COUNT, new Integer(exclamationNum));
+ env.getConfig().setGlobalJobParameters(config);
+
+ // get input data
+ final DataStream<String> text = getTextDataStream(env);
+
+ final DataStream<String> exclaimed = text
+ .transform("StormBoltTokenizer",
+ TypeExtractor.getForObject(""),
+ new BoltWrapper<String, String>(new ExclamationBolt(),
+ new String[] { Utils.DEFAULT_STREAM_ID }))
+ .map(new ExclamationMap());
+
+ // emit result
+ if (fileOutput) {
+ exclaimed.writeAsText(outputPath);
+ } else {
+ exclaimed.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with bolt tokenizer");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ private static class ExclamationMap implements MapFunction<String, String> {
+ private static final long serialVersionUID = 4614754344067170619L;
+
+ @Override
+ public String map(String value) throws Exception {
+ return value + "!!!";
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+ private static int exclamationNum = 2;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 3) {
+ textPath = args[0];
+ outputPath = args[1];
+ exclamationNum = Integer.parseInt(args[2]);
+ } else {
+ System.err.println("Usage: ExclamationWithBolt <text path> <result path> <number of exclamation marks>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing ExclamationWithBolt example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: ExclamationWithBolt <text path> <result path> <number of exclamation marks>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ }
+
+ return env.fromElements(WordCountData.WORDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
new file mode 100644
index 0000000..22938e5
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.util.FiniteFileSpout;
+import org.apache.flink.storm.util.FiniteInMemorySpout;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import backtype.storm.utils.Utils;
+
+/**
+ * Implements the "Exclamation" program that attaches six exclamation marks to every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>ExclamationWithSpout <text path> <result path></code><br/>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm spout within a Flink Streaming program</li>
+ * <li>make use of the FiniteSpout interface</li>
+ * <li>make use of the FiniteSpout interface</li>
+ * <li>how to configure a Spout using StormConfig</li>
+ * </ul>
+ */
+public class ExclamationWithSpout {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<String> text = getTextDataStream(env);
+
+ final DataStream<String> exclaimed = text
+ .map(new ExclamationMap())
+ .map(new ExclamationMap());
+
+ // emit result
+ if (fileOutput) {
+ exclaimed.writeAsText(outputPath);
+ } else {
+ exclaimed.print();
+ }
+
+ // execute program
+ env.execute("Streaming Exclamation with Storm spout source");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ private static class ExclamationMap implements MapFunction<String, String> {
+ private static final long serialVersionUID = -684993133807698042L;
+
+ @Override
+ public String map(String value) throws Exception {
+ return value + "!!!";
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: ExclamationWithSpout <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing ExclamationWithSpout example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: ExclamationWithSpout <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ final String[] tokens = textPath.split(":");
+ final String inputFile = tokens[tokens.length - 1];
+
+ // set Storm configuration
+ StormConfig config = new StormConfig();
+ config.put(FiniteFileSpout.INPUT_FILE_PATH, inputFile);
+ env.getConfig().setGlobalJobParameters(config);
+
+ return env.addSource(
+ new SpoutWrapper<String>(new FiniteFileSpout(),
+ new String[] { Utils.DEFAULT_STREAM_ID }),
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
+ }
+
+ return env.addSource(
+ new SpoutWrapper<String>(new FiniteInMemorySpout(
+ WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }),
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
new file mode 100644
index 0000000..cfc49a1
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.excamation.operators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class ExclamationBolt implements IRichBolt {
+ private final static long serialVersionUID = -6364882114201311380L;
+
+ public final static String EXCLAMATION_COUNT = "exclamation.count";
+
+ private OutputCollector collector;
+ private String exclamation;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+
+ Object count = conf.get(EXCLAMATION_COUNT);
+ if (count != null) {
+ int exclamationNum = (Integer) count;
+ StringBuilder builder = new StringBuilder();
+ for (int index = 0; index < exclamationNum; ++index) {
+ builder.append('!');
+ }
+ this.exclamation = builder.toString();
+ } else {
+ this.exclamation = "!";
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ collector.emit(tuple, new Values(tuple.getString(0) + this.exclamation));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
new file mode 100644
index 0000000..560fe51
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.split.operators.RandomSpout;
+import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements a simple example with two declared output streams for the embedded spout.
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>handle multiple output stream of a spout</li>
+ * <li>accessing each stream by .split(...) and .select(...)</li>
+ * <li>strip wrapper data type SplitStreamType for further processing in Flink</li>
+ * </ul>
+ * <p/>
+ * This example would work the same way for multiple bolt output streams.
+ */
+public class SpoutSplitExample {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ String[] rawOutputs = new String[] { RandomSpout.EVEN_STREAM, RandomSpout.ODD_STREAM };
+
+ final DataStream<SplitStreamType<Integer>> numbers = env.addSource(
+ new SpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0),
+ rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>()));
+
+ SplitStream<SplitStreamType<Integer>> splitStream = numbers
+ .split(new StormStreamSelector<Integer>());
+
+ DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM);
+ DataStream<SplitStreamType<Integer>> oddStream = splitStream.select(RandomSpout.ODD_STREAM);
+
+ evenStream.map(new SplitStreamMapper<Integer>()).returns(Integer.class).map(new Enrich("even")).print();
+ oddStream.transform("oddBolt",
+ TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+ new BoltWrapper<SplitStreamType<Integer>, Tuple2<String, Integer>>(
+ new VerifyAndEnrichBolt(false)))
+ .print();
+
+ // execute program
+ env.execute("Spout split stream example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Same as {@link VerifyAndEnrichBolt}.
+ */
+ private final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 5213888269197438892L;
+ private final Tuple2<String, Integer> out;
+
+ public Enrich(String token) {
+ this.out = new Tuple2<String, Integer>(token, 0);
+ }
+
+ @Override
+ public Tuple2<String, Integer> map(Integer value) throws Exception {
+ this.out.setField(value, 1);
+ return this.out;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
new file mode 100644
index 0000000..d315395
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/RandomSpout.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split.operators;
+
+import java.util.Map;
+import java.util.Random;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class RandomSpout extends BaseRichSpout {
+ private static final long serialVersionUID = -3978554318742509334L;
+
+ public static final String EVEN_STREAM = "even";
+ public static final String ODD_STREAM = "odd";
+
+ private final boolean split;
+ private Random r = new Random();
+ private SpoutOutputCollector collector;
+
+ public RandomSpout(boolean split, long seed) {
+ this.split = split;
+ this.r = new Random(seed);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ int i = r.nextInt();
+ if (split) {
+ if (i % 2 == 0) {
+ this.collector.emit(EVEN_STREAM, new Values(i));
+ } else {
+ this.collector.emit(ODD_STREAM, new Values(i));
+ }
+ } else {
+ this.collector.emit(new Values(i));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ Fields schema = new Fields("number");
+ if (split) {
+ declarer.declareStream(EVEN_STREAM, schema);
+ declarer.declareStream(ODD_STREAM, schema);
+ } else {
+ declarer.declare(schema);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
new file mode 100644
index 0000000..99fec4d
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/operators/VerifyAndEnrichBolt.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split.operators;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class VerifyAndEnrichBolt extends BaseRichBolt {
+ private static final long serialVersionUID = -7277395570966328721L;
+
+ private final boolean evenOrOdd; // true: even -- false: odd
+ private final String token;
+ private OutputCollector collector;
+
+ public VerifyAndEnrichBolt(boolean evenOrOdd) {
+ this.evenOrOdd = evenOrOdd;
+ this.token = evenOrOdd ? "even" : "odd";
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if ((input.getInteger(0) % 2 == 0) != this.evenOrOdd) {
+ throw new RuntimeException("Invalid number detected.");
+ }
+ this.collector.emit(new Values(this.token, input.getInteger(0)));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("evenOrOdd", "number"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
new file mode 100644
index 0000000..a6c61d4
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractBoltSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+import java.util.Map;
+
+/**
+ * Implements a sink that write the received data so some external output. The result is formatted like
+ * {@code (a1, a2, ..., an)} with {@code Object.toString()} for each attribute).
+ */
+public abstract class AbstractBoltSink implements IRichBolt {
+ private static final long serialVersionUID = -1626323806848080430L;
+
+ private StringBuilder lineBuilder;
+ private String prefix = "";
+ private final OutputFormatter formatter;
+
+ public AbstractBoltSink(final OutputFormatter formatter) {
+ this.formatter = formatter;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public final void prepare(final Map stormConf, final TopologyContext context,
+ final OutputCollector collector) {
+ this.prepareSimple(stormConf, context);
+ if (context.getComponentCommon(context.getThisComponentId()).get_parallelism_hint() > 1) {
+ this.prefix = context.getThisTaskId() + "> ";
+ }
+ }
+
+ protected abstract void prepareSimple(final Map<?, ?> stormConf, final TopologyContext context);
+
+ @Override
+ public final void execute(final Tuple input) {
+ this.lineBuilder = new StringBuilder();
+ this.lineBuilder.append(this.prefix);
+ this.lineBuilder.append(this.formatter.format(input));
+ this.writeExternal(this.lineBuilder.toString());
+ }
+
+ protected abstract void writeExternal(final String line);
+
+ @Override
+ public void cleanup() {/* nothing to do */}
+
+ @Override
+ public final void declareOutputFields(final OutputFieldsDeclarer declarer) {/* nothing to do */}
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
new file mode 100644
index 0000000..d19ffbf
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/AbstractLineSpout.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+import java.util.Map;
+
+/**
+ * Base class for Spouts that read data line by line from an arbitrary source. The declared output schema has a single
+ * attribute called {@code line} and should be of type {@link String}.
+ */
+public abstract class AbstractLineSpout implements IRichSpout {
+ private static final long serialVersionUID = 8876828403487806771L;
+
+ public final static String ATTRIBUTE_LINE = "line";
+
+ protected SpoutOutputCollector collector;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void close() {/* noting to do */}
+
+ @Override
+ public void activate() {/* noting to do */}
+
+ @Override
+ public void deactivate() {/* noting to do */}
+
+ @Override
+ public void ack(final Object msgId) {/* noting to do */}
+
+ @Override
+ public void fail(final Object msgId) {/* noting to do */}
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(ATTRIBUTE_LINE));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
new file mode 100644
index 0000000..5cd3f68
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltFileSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.task.TopologyContext;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implements a sink that write the received data to the given file (as a result of {@code Object.toString()} for each
+ * attribute).
+ */
+public final class BoltFileSink extends AbstractBoltSink {
+ private static final long serialVersionUID = 2014027288631273666L;
+
+ private final String path;
+ private BufferedWriter writer;
+
+ public BoltFileSink(final String path) {
+ this(path, new SimpleOutputFormatter());
+ }
+
+ public BoltFileSink(final String path, final OutputFormatter formatter) {
+ super(formatter);
+ this.path = path;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepareSimple(final Map stormConf, final TopologyContext context) {
+ try {
+ this.writer = new BufferedWriter(new FileWriter(this.path));
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void writeExternal(final String line) {
+ try {
+ this.writer.write(line + "\n");
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ if (this.writer != null) {
+ try {
+ this.writer.close();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
new file mode 100644
index 0000000..044246b
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/BoltPrintSink.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.task.TopologyContext;
+
+import java.util.Map;
+
+/**
+ * Implements a sink that prints the received data to {@code stdout}.
+ */
+public final class BoltPrintSink extends AbstractBoltSink {
+ private static final long serialVersionUID = -6650011223001009519L;
+
+ public BoltPrintSink(OutputFormatter formatter) {
+ super(formatter);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepareSimple(final Map stormConf, final TopologyContext context) {
+ /* nothing to do */
+ }
+
+ @Override
+ public void writeExternal(final String line) {
+ System.out.println(line);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
new file mode 100644
index 0000000..1126a2a
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FileSpout.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Implements a Spout that reads data from a given local file.
+ */
+public class FileSpout extends AbstractLineSpout {
+ private static final long serialVersionUID = -6996907090003590436L;
+
+ public final static String INPUT_FILE_PATH = "input.path";
+
+ protected String path = null;
+ protected BufferedReader reader;
+
+ public FileSpout() {}
+
+ public FileSpout(final String path) {
+ this.path = path;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ super.open(conf, context, collector);
+
+ Object configuredPath = conf.get(INPUT_FILE_PATH);
+ if(configuredPath != null) {
+ this.path = (String)configuredPath;
+ }
+
+ try {
+ this.reader = new BufferedReader(new FileReader(this.path));
+ } catch (final FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.reader != null) {
+ try {
+ this.reader.close();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ String line;
+ try {
+ line = this.reader.readLine();
+ if (line != null) {
+ this.collector.emit(new Values(line));
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
new file mode 100644
index 0000000..75450c4
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.flink.storm.util.FiniteSpout;
+
+/**
+ * Implements a Spout that reads data from a given local file. The spout stops automatically
+ * when it reached the end of the file.
+ */
+public class FiniteFileSpout extends FileSpout implements FiniteSpout {
+ private static final long serialVersionUID = -1472978008607215864L;
+
+ private String line;
+ private boolean newLineRead;
+
+ public FiniteFileSpout() {}
+
+ public FiniteFileSpout(String path) {
+ super(path);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ super.open(conf, context, collector);
+ newLineRead = false;
+ }
+
+ @Override
+ public void nextTuple() {
+ this.collector.emit(new Values(line));
+ newLineRead = false;
+ }
+
+ /**
+ * Can be called before nextTuple() any times including 0.
+ */
+ @Override
+ public boolean reachedEnd() {
+ try {
+ readLine();
+ } catch (IOException e) {
+ throw new RuntimeException("Exception occured while reading file " + path);
+ }
+ return line == null;
+ }
+
+ private void readLine() throws IOException {
+ if (!newLineRead) {
+ line = reader.readLine();
+ newLineRead = true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
new file mode 100644
index 0000000..1490872
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import org.apache.flink.storm.util.FiniteSpout;
+
+
+/**
+ * Implements a Spout that reads String[] data stored in memory. The Spout stops automatically when it emitted all of
+ * the data.
+ */
+public class FiniteInMemorySpout extends InMemorySpout<String> implements FiniteSpout {
+ private static final long serialVersionUID = -4008858647468647019L;
+
+ public FiniteInMemorySpout(String[] source) {
+ super(source);
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return counter >= source.length;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
new file mode 100644
index 0000000..5e4c7ba
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/InMemorySpout.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.tuple.Values;
+
+/**
+ * Implements a Spout that reads data stored in memory.
+ */
+public class InMemorySpout<T> extends AbstractLineSpout {
+ private static final long serialVersionUID = -4008858647468647019L;
+
+ protected T[] source;
+ protected int counter = 0;
+
+ public InMemorySpout(T[] source) {
+ this.source = source;
+ }
+
+ @Override
+ public void nextTuple() {
+ if (this.counter < source.length) {
+ this.collector.emit(new Values(source[this.counter++]));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
new file mode 100644
index 0000000..e696f9b
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/OutputFormatter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface OutputFormatter extends Serializable {
+
+ /**
+ * Converts a Storm {@link Tuple} to a string. This method is used for formatting the output tuples before writing
+ * them out to a file or to the console.
+ *
+ * @param input
+ * The tuple to be formatted
+ * @return The string result of the formatting
+ */
+ public String format(Tuple input);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
new file mode 100644
index 0000000..cef0081
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/SimpleOutputFormatter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.tuple.Tuple;
+
+public class SimpleOutputFormatter implements OutputFormatter {
+ private static final long serialVersionUID = 6349573860144270338L;
+
+ /**
+ * Converts a Storm {@link Tuple} with 1 field to a string by retrieving the value of that field. This method is
+ * used for formatting raw outputs wrapped in tuples, before writing them out to a file or to the console.
+ *
+ * @param input
+ * The tuple to be formatted
+ * @return The string result of the formatting
+ */
+ @Override
+ public String format(final Tuple input) {
+ if (input.getValues().size() != 1) {
+ throw new RuntimeException("The output is not raw");
+ }
+ return input.getValue(0).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
new file mode 100644
index 0000000..5d7ba53
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/TupleOutputFormatter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.tuple.Tuple;
+
+public class TupleOutputFormatter implements OutputFormatter {
+ private static final long serialVersionUID = -599665757723851761L;
+
+ @Override
+ public String format(final Tuple input) {
+ final StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("(");
+ for (final Object attribute : input.getValues()) {
+ stringBuilder.append(attribute);
+ stringBuilder.append(",");
+ }
+ stringBuilder.replace(stringBuilder.length() - 1, stringBuilder.length(), ")");
+ return stringBuilder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
new file mode 100644
index 0000000..aa3a075
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.wordcount.operators.BoltTokenizer;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Bolt within a Flink Streaming program.</li>
+ * </ul>
+ */
+public class BoltTokenizerWordCount {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<String> text = getTextDataStream(env);
+
+ final DataStream<Tuple2<String, Integer>> counts = text
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // this is done by a bolt that is wrapped accordingly
+ .transform("BoltTokenizer",
+ TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+ new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer()))
+ // group by the tuple field "0" and sum up tuple field "1"
+ .keyBy(0).sum(1);
+
+ // emit result
+ if (fileOutput) {
+ counts.writeAsText(outputPath);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with bolt tokenizer");
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing BoltTokenizerWordCount example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: BoltTokenizerWordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ }
+
+ return env.fromElements(WordCountData.WORDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
new file mode 100644
index 0000000..f72acb3
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.wordcount.operators.BoltTokenizerByName;
+import org.apache.flink.storm.wordcount.operators.WordCountDataPojos;
+import org.apache.flink.storm.wordcount.operators.WordCountDataPojos.Sentence;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}. In contrast to {@link BoltTokenizerWordCount}
+ * the tokenizer's input is a POJO type and the single field is accessed by name.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to access attributes by name within a Bolt for POJO type input streams
+ * </ul>
+ */
+public class BoltTokenizerWordCountPojo {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<Sentence> text = getTextDataStream(env);
+
+ final DataStream<Tuple2<String, Integer>> counts = text
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // this is done by a bolt that is wrapped accordingly
+ .transform("BoltTokenizerPojo",
+ TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+ new BoltWrapper<Sentence, Tuple2<String, Integer>>(new BoltTokenizerByName()))
+ // group by the tuple field "0" and sum up tuple field "1"
+ .keyBy(0).sum(1);
+
+ // emit result
+ if (fileOutput) {
+ counts.writeAsText(outputPath);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with POJO bolt tokenizer");
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: BoltTokenizerWordCountPojo <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out
+ .println("Executing BoltTokenizerWordCountPojo example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: BoltTokenizerWordCountPojo <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<Sentence> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ PojoTypeInfo<Sentence> sourceType = (PojoTypeInfo<Sentence>) TypeExtractor
+ .getForObject(new Sentence(""));
+ return env.createInput(new CsvInputFormat<Sentence>(new Path(
+ textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
+ CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
+ sourceType);
+ }
+
+ return env.fromElements(WordCountDataPojos.SENTENCES);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
new file mode 100644
index 0000000..7617e95
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.wordcount.operators.BoltTokenizerByName;
+import org.apache.flink.storm.wordcount.operators.WordCountDataTuple;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a {@link IRichBolt Bolt}. In contrast to {@link BoltTokenizerWordCount}
+ * the tokenizer's input is a {@link Tuple} type and the single field is accessed by name.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to access attributes by name within a Bolt for {@link Tuple} type input streams
+ * </ul>
+ */
+public class BoltTokenizerWordCountWithNames {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<Tuple1<String>> text = getTextDataStream(env);
+
+ final DataStream<Tuple2<String, Integer>> counts = text
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ // this is done by a Storm bolt that is wrapped accordingly
+ .transform(
+ "BoltTokenizerWithNames",
+ TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+ new BoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(
+ new BoltTokenizerByName(), new Fields("sentence")))
+ // group by the tuple field "0" and sum up tuple field "1"
+ .keyBy(0).sum(1);
+
+ // emit result
+ if (fileOutput) {
+ counts.writeAsText(outputPath);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with schema bolt tokenizer");
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: BoltTokenizerWordCountWithNames <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing BoltTokenizerWordCountWithNames example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: BoltTokenizerWordCountWithNames <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ TupleTypeInfo<Tuple1<String>> sourceType = (TupleTypeInfo<Tuple1<String>>)TypeExtractor
+ .getForObject(new Tuple1<String>(""));
+ return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
+ textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
+ CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
+ sourceType);
+ }
+
+ return env.fromElements(WordCountDataTuple.TUPLES);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
new file mode 100644
index 0000000..bb451fe
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/SpoutSourceWordCount.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.wordcount.operators.WordCountFileSpout;
+import org.apache.flink.storm.wordcount.operators.WordCountInMemorySpout;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The used data source is a {@link IRichSpout Spout}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Spout within a Flink Streaming program.</li>
+ * </ul>
+ */
+public class SpoutSourceWordCount {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ final DataStream<String> text = getTextDataStream(env);
+
+ final DataStream<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new Tokenizer())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .keyBy(0).sum(1);
+
+ // emit result
+ if (fileOutput) {
+ counts.writeAsText(outputPath);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("Streaming WordCount with spout source");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a user-defined FlatMapFunction. The function
+ * takes a line (String) and splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ */
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(final String value, final Collector<Tuple2<String, Integer>> out) throws Exception {
+ // normalize and split the line
+ final String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (final String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: SpoutSourceWordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing SpoutSourceWordCount example with built-in default data");
+ System.out.println(" Provide parameters to read input data from a file");
+ System.out.println(" Usage: SpoutSourceWordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(final StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ final String[] tokens = textPath.split(":");
+ final String localFile = tokens[tokens.length - 1];
+ return env.addSource(
+ new SpoutWrapper<String>(new WordCountFileSpout(localFile),
+ new String[] { Utils.DEFAULT_STREAM_ID }, -1),
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
+ }
+
+ return env.addSource(
+ new SpoutWrapper<String>(new WordCountInMemorySpout(),
+ new String[] { Utils.DEFAULT_STREAM_ID }, -1),
+ TypeExtractor.getForClass(String.class)).setParallelism(1);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
new file mode 100644
index 0000000..18f49c1
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
+ * same way as to a Storm {@link LocalCluster}.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
+ * via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCountLocal <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink</li>
+ * </ul>
+ */
+public class WordCountLocal {
+ public final static String topologyId = "Storm WordCount";
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!WordCountTopology.parseParameters(args)) {
+ return;
+ }
+
+ // build Topology the Storm way
+ final FlinkTopologyBuilder builder = WordCountTopology.buildTopology();
+
+ // execute program locally
+ final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+ cluster.submitTopology(topologyId, null, builder.createTopology());
+
+ Utils.sleep(10 * 1000);
+
+ // TODO kill does no do anything so far
+ cluster.killTopology(topologyId);
+ cluster.shutdown();
+ }
+
+}