You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/22 13:48:15 UTC

[1/2] flink git commit: [FLINK-2304] Add named attribute access to Storm compatibility layer - extended FlinkTuple to enable named attribute access - extended BoltWrapper for user defined input schema - extended FlinkTopologyBuilder to handle decla

Repository: flink
Updated Branches:
  refs/heads/master 148395bcd -> 03320503e


http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
index d39a526..f028266 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
@@ -19,16 +19,19 @@ package org.apache.flink.stormcompatibility.wordcount;
 
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
+
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 import org.apache.flink.stormcompatibility.util.OutputFormatter;
 import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
 import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
+import org.apache.flink.stormcompatibility.util.StormWordCountFileSpout;
+import org.apache.flink.stormcompatibility.util.StormWordCountInMemorySpout;
 import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounterByName;
 import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
@@ -55,6 +58,10 @@ public class WordCountTopology {
 	private final static OutputFormatter formatter = new TupleOutputFormatter();
 
 	public static FlinkTopologyBuilder buildTopology() {
+		return buildTopology(true);
+	}
+
+	public static FlinkTopologyBuilder buildTopology(boolean indexOrName) {
 
 		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
 
@@ -63,16 +70,25 @@ public class WordCountTopology {
 			// read the text file from given input path
 			final String[] tokens = textPath.split(":");
 			final String inputFile = tokens[tokens.length - 1];
-			builder.setSpout(spoutId, new StormFileSpout(inputFile));
+			builder.setSpout(spoutId, new StormWordCountFileSpout(inputFile));
 		} else {
-			builder.setSpout(spoutId, new StormInMemorySpout(WordCountData.WORDS));
+			builder.setSpout(spoutId, new StormWordCountInMemorySpout(WordCountData.WORDS));
 		}
 
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		builder.setBolt(tokenierzerId, new StormBoltTokenizer(), 4).shuffleGrouping(spoutId);
-		// group by the tuple field "0" and sum up tuple field "1"
-		builder.setBolt(counterId, new StormBoltCounter(), 4).fieldsGrouping(tokenierzerId,
-				new Fields(StormBoltTokenizer.ATTRIBUTE_WORD));
+		if (indexOrName) {
+			// split up the lines in pairs (2-tuples) containing: (word,1)
+			builder.setBolt(tokenierzerId, new StormBoltTokenizer(), 4).shuffleGrouping(spoutId);
+			// group by the tuple field "0" and sum up tuple field "1"
+			builder.setBolt(counterId, new StormBoltCounter(), 4).fieldsGrouping(tokenierzerId,
+					new Fields(StormBoltTokenizer.ATTRIBUTE_WORD));
+		} else {
+			// split up the lines in pairs (2-tuples) containing: (word,1)
+			builder.setBolt(tokenierzerId, new StormBoltTokenizerByName(), 4).shuffleGrouping(
+					spoutId);
+			// group by the tuple field "0" and sum up tuple field "1"
+			builder.setBolt(counterId, new StormBoltCounterByName(), 4).fieldsGrouping(
+					tokenierzerId, new Fields(StormBoltTokenizerByName.ATTRIBUTE_WORD));
+		}
 
 		// emit result
 		if (fileInputOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
new file mode 100644
index 0000000..bf940c3
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements the word counter that the occurrence of each unique word. The bolt takes a pair (input tuple schema:
+ * {@code <String,Integer>}) and sums the given word count for each unique word (output tuple schema:
+ * {@code <String,Integer>} ).
+ */
+public class StormBoltCounterByName implements IRichBolt {
+	private static final long serialVersionUID = 399619605462625934L;
+
+	public static final String ATTRIBUTE_WORD = "word";
+	public static final String ATTRIBUTE_COUNT = "count";
+
+	private final HashMap<String, Count> counts = new HashMap<String, Count>();
+	private OutputCollector collector;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void execute(final Tuple input) {
+		final String word = input.getStringByField(StormBoltTokenizer.ATTRIBUTE_WORD);
+
+		Count currentCount = this.counts.get(word);
+		if (currentCount == null) {
+			currentCount = new Count();
+			this.counts.put(word, currentCount);
+		}
+		currentCount.count += input.getIntegerByField(StormBoltTokenizer.ATTRIBUTE_COUNT);
+
+		this.collector.emit(new Values(word, currentCount.count));
+	}
+
+	@Override
+	public void cleanup() {/* nothing to do */}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+	/**
+	 * A counter helper to emit immutable tuples to the given stormCollector and avoid unnecessary object
+	 * creating/deletion.
+	 */
+	private static final class Count {
+		public int count;
+
+		public Count() {/* nothing to do */}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
index 96bd87c..dfb3e37 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
@@ -31,6 +31,8 @@ import java.util.Map;
  * Implements the string tokenizer that splits sentences into words as a Storm bolt. The bolt takes a line (input tuple
  * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema:
  * {@code <String,Integer>}).
+ * <p>
+ * Same as {@link StormBoltTokenizerByName}, but accesses input attribute by index (instead of name).
  */
 public final class StormBoltTokenizer implements IRichBolt {
 	private static final long serialVersionUID = -8589620297208175149L;

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
new file mode 100644
index 0000000..8796b95
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ * Implements the string tokenizer that splits sentences into words as a Storm bolt. The bolt takes a line (input tuple
+ * schema: {@code <String>}) and splits it into multiple pairs in the form of "(word,1)" (output tuple schema:
+ * {@code <String,Integer>}).
+ * <p>
+ * Same as {@link StormBoltTokenizer}, but accesses input attribute by name (instead of index).
+ */
+public final class StormBoltTokenizerByName implements IRichBolt {
+	private static final long serialVersionUID = -8589620297208175149L;
+
+	public static final String ATTRIBUTE_WORD = "word";
+	public static final String ATTRIBUTE_COUNT = "count";
+
+	public static final int ATTRIBUTE_WORD_INDEX = 0;
+	public static final int ATTRIBUTE_COUNT_INDEX = 1;
+
+	private OutputCollector collector;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void execute(final Tuple input) {
+		final String[] tokens = input.getStringByField("sentence").toLowerCase().split("\\W+");
+
+		for (final String token : tokens) {
+			if (token.length() > 0) {
+				this.collector.emit(new Values(token, 1));
+			}
+		}
+	}
+
+	@Override
+	public void cleanup() {/* nothing to do */}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
new file mode 100644
index 0000000..f965a28
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataPojos.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import java.io.Serializable;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+public class WordCountDataPojos {
+	public static Sentence[] SENTENCES;
+
+	static {
+		SENTENCES = new Sentence[WordCountData.WORDS.length];
+		for (int i = 0; i < SENTENCES.length; ++i) {
+			SENTENCES[i] = new Sentence(WordCountData.WORDS[i]);
+		}
+	}
+
+	public static class Sentence implements Serializable {
+		private static final long serialVersionUID = -7336372859203407522L;
+
+		private String sentence;
+
+		public Sentence() {
+		}
+
+		public Sentence(String sentence) {
+			this.sentence = sentence;
+		}
+
+		public String getSentence() {
+			return sentence;
+		}
+
+		public void setSentence(String sentence) {
+			this.sentence = sentence;
+		}
+
+		@Override
+		public String toString() {
+			return "(" + this.sentence + ")";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
new file mode 100644
index 0000000..732f0ae
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/WordCountDataTuple.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount.stormoperators;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+@SuppressWarnings("unchecked")
+public class WordCountDataTuple {
+	public static Tuple1<String>[] TUPLES;
+
+	static {
+		TUPLES = new Tuple1[WordCountData.WORDS.length];
+		for (int i = 0; i < TUPLES.length; ++i) {
+			TUPLES[i] = new Tuple1<String>(WordCountData.WORDS[i]);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
new file mode 100644
index 0000000..dc75c25
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojoITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		BoltTokenizerWordCountPojo.main(new String[]{this.textPath, this.resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
new file mode 100644
index 0000000..e147f53
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNamesITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class BoltTokenizerWordCountWithNamesITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		BoltTokenizerWordCountWithNames.main(new String[]{this.textPath, this.resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
new file mode 100644
index 0000000..8b9a729
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocalNamedITCase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTestCluster;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormWordCountLocalNamedITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		FlinkLocalCluster.initialize(new FlinkTestCluster());
+		this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+		this.resultPath = this.getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		StormWordCountNamedLocal.main(new String[] { this.textPath, this.resultPath });
+	}
+
+}


[2/2] flink git commit: [FLINK-2304] Add named attribute access to Storm compatibility layer - extended FlinkTuple to enable named attribute access - extended BoltWrapper for user defined input schema - extended FlinkTopologyBuilder to handle decla

Posted by gy...@apache.org.
[FLINK-2304] Add named attribute access to Storm compatibility layer
  - extended FlinkTuple to enable named attribute access
  - extended BoltWrapper for user defined input schema
  - extended FlinkTopologyBuilder to handle declared output schemas
  - adapted JUnit tests
  - added new examples and ITCases
  - updated READMEs
  - updated documentation

Closes #878


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03320503
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03320503
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03320503

Branch: refs/heads/master
Commit: 03320503e20c2038412c889d9d8b61821d4963af
Parents: 148395b
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Mon Jun 29 17:50:07 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Jul 22 13:47:22 2015 +0200

----------------------------------------------------------------------
 docs/apis/storm_compatibility.md                |  33 +-
 .../flink-storm-compatibility-core/README.md    |   1 -
 .../api/FlinkOutputFieldsDeclarer.java          |   2 +-
 .../api/FlinkTopologyBuilder.java               |   8 +-
 .../wrappers/AbstractStormCollector.java        |   2 +-
 .../wrappers/StormBoltWrapper.java              |  75 +++-
 .../stormcompatibility/wrappers/StormTuple.java | 110 ++++-
 .../wrappers/StormBoltWrapperTest.java          |  10 +-
 .../wrappers/StormTupleTest.java                | 444 ++++++++++++++++---
 .../README.md                                   |   6 +-
 .../stormcompatibility/util/StormFileSpout.java |   2 +-
 .../util/StormInMemorySpout.java                |   2 +-
 .../util/StormWordCountFileSpout.java           |  38 ++
 .../util/StormWordCountInMemorySpout.java       |  39 ++
 .../wordcount/BoltTokenizerWordCount.java       |   4 +-
 .../wordcount/BoltTokenizerWordCountPojo.java   | 135 ++++++
 .../BoltTokenizerWordCountWithNames.java        | 138 ++++++
 .../wordcount/SpoutSourceWordCount.java         |   4 +-
 .../wordcount/StormWordCountNamedLocal.java     |  76 ++++
 .../wordcount/WordCountTopology.java            |  34 +-
 .../stormoperators/StormBoltCounterByName.java  |  88 ++++
 .../stormoperators/StormBoltTokenizer.java      |   2 +
 .../StormBoltTokenizerByName.java               |  78 ++++
 .../stormoperators/WordCountDataPojos.java      |  59 +++
 .../stormoperators/WordCountDataTuple.java      |  34 ++
 .../BoltTokenizerWordCountPojoITCase.java       |  45 ++
 .../BoltTokenizerWordCountWithNamesITCase.java  |  45 ++
 .../StormWordCountLocalNamedITCase.java         |  48 ++
 28 files changed, 1430 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index 0f6b17b..b8fe66e 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -34,7 +34,7 @@ This document shows how to use existing Storm code with Flink.
 * This will be replaced by the TOC
 {:toc}
 
-### Project Configuration
+# Project Configuration
 
 Support for Storm is contained in the `flink-storm-compatibility-core` Maven module.
 The code resides in the `org.apache.flink.stormcompatibility` package.
@@ -51,7 +51,7 @@ Add the following dependency to your `pom.xml` if you want to execute Storm code
 
 **Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink distribution. Thus, you need to include `flink-storm-compatiblitly-core` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
 
-### Execute Storm Topologies
+# Execute Storm Topologies
 
 Flink provides a Storm compatible API (`org.apache.flink.stormcompatibility.api`) that offers replacements for the following classes:
 
@@ -88,18 +88,18 @@ if(runLocal) { // submit to test cluster
 </div>
 </div>
 
-### Embed Storm Operators in Flink Streaming Programs 
+# Embed Storm Operators in Flink Streaming Programs 
 
 As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
 The Storm compatibility layer offers a wrapper classes for each, namely `StormSpoutWrapper` and `StormBoltWrapper` (`org.apache.flink.stormcompatibility.wrappers`).
 
-Per default, both wrappers convert Storm output tuples to Flink's `Tuple` types (ie, `Tuple1` to `Tuple25` according to the number of fields of the Storm tuples).
+Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple1` to `Tuple25` according to the number of fields of the Storm tuples).
 For single field output tuples a conversion to the field's data type is also possible (eg, `String` instead of `Tuple1<String>`).
 
 Because Flink cannot infer the output field types of Storm operators, it is required to specify the output type manually.
 In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` can be used.
 
-#### Embed Spouts
+## Embed Spouts
 
 In order to use a Spout as Flink source, use `StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
 The Spout object is handed to the constructor of `StormSpoutWrapper<OUT>` that serves as first argument to `addSource(...)`.
@@ -126,7 +126,7 @@ Using `StormFiniteSpoutWrapper` allows the Flink program to shut down automatica
 If `StormSpoutWrapper` is used, the program will run until it is [canceled](cli.html) manually.
 
 
-#### Embed Bolts
+## Embed Bolts
 
 In order to use a Bolt as Flink operator, use `DataStream.transform(String, TypeInformation, OneInputStreamOperator)`.
 The Bolt object is handed to the constructor of `StormBoltWrapper<IN,OUT>` that serves as last argument to `transform(...)`.
@@ -149,7 +149,26 @@ DataStream<Tuple2<String, Integer>> counts = text.transform(
 </div>
 </div>
 
-### Storm Compatibility Examples
+### Named Attribute Access for Embedded Bolts
+
+Bolts can accesses input tuple fields via name (additionally to access via index).
+To use this feature with embedded Bolts, you need to have either a
+
+ 1. [POJO](programming_guide.html#pojos) type input stream or
+ 2. [Tuple](programming_guide.html#tuples-and-case-classes) type input stream and spedify the input schema (ie, name-to-index-mapping)
+
+For POJO input types, Flink accesses the fields via reflection.
+For this case, Flink expects either a corresponding public member variable or public getter method.
+For example, if a Bolt accesses a field via name `sentence` (eg, `String s = input.getStringByField("sentence");`), the input POJO class must have a member variable `public String sentence;` or method `public String getSentence() { ... };` (pay attention to camel-case naming).
+
+For `Tuple` input types, it is required to specify the input schema using Storm's `Fields` class.
+For this case, the constructor of `StormBoltWrapper` takes an additional argument: `new StormBoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(new StormBoltTokenizerByName(), new Fields("sentence"))`.
+The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.getStringByField("sentence")` is equivalent to `input.getString(0)`.
+
+See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.  
+
+# Storm Compatibility Examples
 
 You can find more examples in Maven module `flink-storm-compatibilty-examples`.
+For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
index 0d490a3..04d8934 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
@@ -5,7 +5,6 @@ The Storm compatibility layer allows to embed spouts or bolt unmodified within a
 The following Strom features are not (yet/fully) supported by the compatibility layer right now:
 * the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
 * topology and tuple meta information (ie, `TopologyContext` not fully supported)
-* access to tuple attributes (ie, fields) only by index (access by name is coming)
 * only default stream is supported currently (ie, only a single output stream)
 * no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
 * for whole Storm topologies the following is not supported by Flink:

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
index 7661ab9..49d73f8 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
@@ -37,7 +37,7 @@ import java.util.List;
 final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 
 	/** the declared output schema */
-	private Fields outputSchema;
+	Fields outputSchema;
 
 	@Override
 	public void declare(final Fields fields) {

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index 239c5eb..6c39561 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -30,6 +30,8 @@ import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.IRichStateSpout;
 import backtype.storm.topology.SpoutDeclarer;
 import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
 import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
@@ -60,6 +62,8 @@ public class FlinkTopologyBuilder {
 	private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
 	/** All user bolts by their ID */
 	private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
+	/** All declared output schemas by operator ID */
+	private final HashMap<String, Fields> outputSchemas = new HashMap<String, Fields>();
 
 	/**
 	 * Creates a Flink program that used the specified spouts and bolts.
@@ -79,6 +83,7 @@ public class FlinkTopologyBuilder {
 
 			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
 			userSpout.declareOutputFields(declarer);
+			this.outputSchemas.put(spoutId, declarer.outputSchema);
 
 			/* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
 			 * and StormCollector)
@@ -118,6 +123,7 @@ public class FlinkTopologyBuilder {
 
 				final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
 				userBolt.declareOutputFields(declarer);
+				this.outputSchemas.put(boltId, declarer.outputSchema);
 
 				final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
 
@@ -162,7 +168,7 @@ public class FlinkTopologyBuilder {
 						final TypeInformation<?> outType = declarer.getOutputType();
 
 						final SingleOutputStreamOperator operator = inputDataStream.transform(boltId, outType,
-								new StormBoltWrapper(userBolt));
+								new StormBoltWrapper(userBolt, this.outputSchemas.get(producerId)));
 						if (outType != null) {
 							// only for non-sink nodes
 							availableOperators.put(boltId, operator);

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
index e8048b0..dc77ca1 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
@@ -68,7 +68,7 @@ abstract class AbstractStormCollector<OUT> {
 			}
 		} else {
 			throw new UnsupportedOperationException(
-					"SimpleStormBoltWrapper can handle not more then 25 attributes, but "
+					"Flink cannot handle more then 25 attributes, but "
 					+ this.numberOfAttributes + " are declared by the given bolt");
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index c7b87ba..8bcdae0 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.stormcompatibility.wrappers;
 
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 
 
@@ -53,6 +52,8 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 	private final IRichBolt bolt;
 	/** Number of attributes of the bolt's output tuples */
 	private final int numberOfAttributes;
+	/** The schema (ie, ordered field names) of the input stream. */
+	private final Fields inputSchema;
 
 	/**
 	 *  We have to use this because Operators must output
@@ -61,9 +62,10 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 	private TimestampedCollector<OUT> flinkCollector;
 
 	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
-	 * such that it can be used within a Flink streaming program. The output type will be one of
-	 * {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+	 * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
+	 * for POJO input types. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's
+	 * declared number of attributes.
 	 * 
 	 * @param bolt
 	 * 		The Storm {@link IRichBolt bolt} to be used.
@@ -71,15 +73,33 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 	 * 		If the number of declared output attributes is not with range [1;25].
 	 */
 	public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
-		this(bolt, false);
+		this(bolt, null, false);
 	}
 
 	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
-	 * such that it can be used within a Flink streaming program. The output type can be any type if
-	 * parameter {@code rawOutput} is {@code true} and the bolt's number of declared output tuples
-	 * is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
-	 * {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+	 * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
+	 * {@link Tuple1} to {@link Tuple25}. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on
+	 * the bolt's declared number of attributes.
+	 * 
+	 * @param bolt
+	 * 		The Storm {@link IRichBolt bolt} to be used.
+	 * @param inputSchema
+	 * 		The schema (ie, ordered field names) of the input stream.
+	 * @throws IllegalArgumentException
+	 * 		If the number of declared output attributes is not with range [1;25].
+	 */
+	public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema)
+			throws IllegalArgumentException {
+		this(bolt, inputSchema, false);
+	}
+
+	/**
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+	 * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
+	 * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
+	 * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
+	 * of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
 	 * 
 	 * @param bolt
 	 * 		The Storm {@link IRichBolt bolt} to be used.
@@ -91,8 +111,34 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 	 * 		not 1 or if {@code rawOuput} is {@code false} and the number of declared output
 	 * 		attributes is not with range [1;25].
 	 */
-	public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput) throws IllegalArgumentException {
+	public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput)
+			throws IllegalArgumentException {
+		this(bolt, null, rawOutput);
+	}
+
+	/**
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+	 * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
+	 * {@link Tuple1} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
+	 * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
+	 * be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * 
+	 * @param bolt
+	 * 		The Storm {@link IRichBolt bolt} to be used.
+	 * @param inputSchema
+	 * 		The schema (ie, ordered field names) of the input stream.
+	 * @param rawOutput
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type
+	 * 		{@link Tuple1} but be of a raw type.
+	 * @throws IllegalArgumentException
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is
+	 * 		not 1 or if {@code rawOuput} is {@code false} and the number of declared output
+	 * 		attributes is not with range [1;25].
+	 */
+	public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema, final boolean rawOutput)
+			throws IllegalArgumentException {
 		this.bolt = bolt;
+		this.inputSchema = inputSchema;
 		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput);
 	}
 
@@ -101,7 +147,7 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 		super.open(parameters);
 
 		final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(
-				(StreamingRuntimeContext)super.runtimeContext, false);
+				super.runtimeContext, false);
 		flinkCollector = new TimestampedCollector<OUT>(output);
 		OutputCollector stormCollector = null;
 
@@ -122,11 +168,12 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 	@Override
 	public void processElement(final StreamRecord<IN> element) throws Exception {
 		flinkCollector.setTimestamp(element.getTimestamp());
-		this.bolt.execute(new StormTuple<IN>(element.getValue()));
+		this.bolt.execute(new StormTuple<IN>(element.getValue(), inputSchema));
 	}
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
 		output.emitWatermark(mark);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
index 51db745..07d94b4 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
@@ -30,6 +30,8 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Values;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.List;
 
 /**
@@ -37,17 +39,23 @@ import java.util.List;
  */
 class StormTuple<IN> implements backtype.storm.tuple.Tuple {
 
-	/** The storm representation of the original Flink tuple */
+	/** The Storm representation of the original Flink tuple */
 	private final Values stormTuple;
+	/** The schema (ie, ordered field names) of the tuple */
+	private final Fields schema;
 
 	/**
-	 * Create a new Storm tuple from the given Flink tuple.
-	 *
+	 * Create a new Storm tuple from the given Flink tuple. The provided {@code nameIndexMap} is ignored for raw input
+	 * types.
+	 * 
 	 * @param flinkTuple
 	 * 		The Flink tuple to be converted.
+	 * @param schema
+	 * 		The schema (ie, ordered field names) of the tuple.
 	 */
-	public StormTuple(final IN flinkTuple) {
+	public StormTuple(final IN flinkTuple, final Fields schema) {
 		if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
+			this.schema = schema;
 			final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple;
 
 			final int numberOfAttributes = t.getArity();
@@ -56,6 +64,7 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
 				this.stormTuple.add(t.getField(i));
 			}
 		} else {
+			this.schema = null;
 			this.stormTuple = new Values(flinkTuple);
 		}
 	}
@@ -67,22 +76,38 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
 
 	@Override
 	public boolean contains(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		if (this.schema != null) {
+			return this.schema.contains(field);
+		}
+
+		try {
+			this.getPublicMemberField(field);
+			return true;
+		} catch (NoSuchFieldException f) {
+			try {
+				this.getGetterMethod(field);
+				return true;
+			} catch (Exception g) {
+				return false;
+			}
+		} catch (Exception e) {
+			return false;
+		}
 	}
 
 	@Override
 	public Fields getFields() {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return this.schema;
 	}
 
 	@Override
 	public int fieldIndex(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return this.schema.fieldIndex(field);
 	}
 
 	@Override
 	public List<Object> select(final Fields selector) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return this.schema.select(selector, this.stormTuple);
 	}
 
 	@Override
@@ -135,54 +160,103 @@ class StormTuple<IN> implements backtype.storm.tuple.Tuple {
 		return (byte[]) this.stormTuple.get(i);
 	}
 
+	private Field getPublicMemberField(final String field) throws Exception {
+		assert (this.stormTuple.size() == 1);
+		return this.stormTuple.get(0).getClass().getField(field);
+	}
+
+	private Method getGetterMethod(final String field) throws Exception {
+		assert (this.stormTuple.size() == 1);
+		return this.stormTuple
+				.get(0)
+				.getClass()
+				.getMethod("get" + Character.toUpperCase(field.charAt(0)) + field.substring(1),
+						(Class<?>[]) null);
+	}
+
+	private Object getValueByPublicMember(final String field) throws Exception {
+		assert (this.stormTuple.size() == 1);
+		return getPublicMemberField(field).get(this.stormTuple.get(0));
+	}
+
+	private Object getValueByGetter(final String field) throws Exception {
+		assert (this.stormTuple.size() == 1);
+		return getGetterMethod(field).invoke(this.stormTuple.get(0), (Object[]) null);
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> T getValueByName(final String field) {
+		if (this.schema != null) {
+			return (T) this.getValue(this.schema.fieldIndex(field));
+		}
+		assert (this.stormTuple.size() == 1);
+
+		Exception e;
+		try {
+			// try public member
+			return (T) getValueByPublicMember(field);
+		} catch (NoSuchFieldException f) {
+			try {
+				// try getter-method
+				return (T) getValueByGetter(field);
+			} catch (Exception g) {
+				e = g;
+			}
+		} catch (Exception f) {
+			e = f;
+		}
+
+		throw new RuntimeException("Could not access field <" + field + ">", e);
+	}
+
 	@Override
 	public Object getValueByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override
 	public String getStringByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override
 	public Integer getIntegerByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override
 	public Long getLongByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override
 	public Boolean getBooleanByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override
 	public Short getShortByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override
 	public Byte getByteByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override
 	public Double getDoubleByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override
 	public Float getFloatByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override
 	public byte[] getBinaryByField(final String field) {
-		throw new UnsupportedOperationException("Not implemented yet");
+		return getValueByName(field);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index dd56c4d..3e55d23 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -47,7 +47,6 @@ import static org.mockito.Mockito.when;
 @PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
 public class StormBoltWrapperTest {
 
-	@SuppressWarnings("unused")
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperRawType() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -57,7 +56,6 @@ public class StormBoltWrapperTest {
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
 	}
 
-	@SuppressWarnings("unused")
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperToManyAttributes1() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -71,7 +69,6 @@ public class StormBoltWrapperTest {
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
 	}
 
-	@SuppressWarnings("unused")
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperToManyAttributes2() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
@@ -127,16 +124,17 @@ public class StormBoltWrapperTest {
 		declarer.declare(new Fields(schema));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
-		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
+		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, null);
 		wrapper.setup(mock(Output.class), taskContext);
 		wrapper.open(new Configuration());
 
 		wrapper.processElement(record);
 		if (numberOfAttributes == 0) {
-			verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
+			verify(bolt).execute(eq(new StormTuple<String>(rawTuple, null)));
 		} else {
-			verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple)));
+			verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple, null)));
 		}
+
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
index 14b1c60..96e7b35 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
@@ -21,17 +21,36 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Mockito.mock;
+
 public class StormTupleTest extends AbstractTest {
+	private static final String fieldName = "fieldName";
+	private static final String fieldNamePojo = "member";
+
+	private int arity, index;
+
+	@Override
+	@Before
+	public void prepare() {
+		super.prepare();
+		this.arity = 1 + r.nextInt(25);
+		this.index = r.nextInt(this.arity);
+	}
 
 	@Test
 	public void nonTupleTest() {
 		final Object flinkTuple = this.r.nextInt();
 
-		final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple);
+		final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple, null);
 		Assert.assertSame(flinkTuple, tuple.getValue(0));
 
 		final List<Object> values = tuple.getValues();
@@ -50,7 +69,7 @@ public class StormTupleTest extends AbstractTest {
 			flinkTuple.setField(data[i], i);
 		}
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		final List<Object> values = tuple.getValues();
 
 		Assert.assertEquals(numberOfAttributes, values.size());
@@ -70,7 +89,7 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
 	}
 
@@ -78,7 +97,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testBoolean() {
 		final Boolean flinkTuple = this.r.nextBoolean();
 
-		final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple);
+		final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple, tuple.getBoolean(0));
 	}
 
@@ -86,7 +105,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testByte() {
 		final Byte flinkTuple = (byte) this.r.nextInt();
 
-		final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple);
+		final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple, tuple.getByte(0));
 	}
 
@@ -94,7 +113,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testDouble() {
 		final Double flinkTuple = this.r.nextDouble();
 
-		final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple);
+		final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple, tuple.getDouble(0));
 	}
 
@@ -102,7 +121,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testFloat() {
 		final Float flinkTuple = this.r.nextFloat();
 
-		final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple);
+		final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple, tuple.getFloat(0));
 	}
 
@@ -110,7 +129,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testInteger() {
 		final Integer flinkTuple = this.r.nextInt();
 
-		final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple);
+		final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple, tuple.getInteger(0));
 	}
 
@@ -118,7 +137,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testLong() {
 		final Long flinkTuple = this.r.nextLong();
 
-		final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple);
+		final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple, tuple.getLong(0));
 	}
 
@@ -126,7 +145,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testShort() {
 		final Short flinkTuple = (short) this.r.nextInt();
 
-		final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple);
+		final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple, tuple.getShort(0));
 	}
 
@@ -136,7 +155,7 @@ public class StormTupleTest extends AbstractTest {
 		this.r.nextBytes(data);
 		final String flinkTuple = new String(data);
 
-		final StormTuple<String> tuple = new StormTuple<String>(flinkTuple);
+		final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple, tuple.getString(0));
 	}
 
@@ -149,7 +168,7 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
 	}
 
@@ -161,7 +180,7 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getBoolean(index));
 	}
 
@@ -173,7 +192,7 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getByte(index));
 	}
 
@@ -185,7 +204,7 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getDouble(index));
 	}
 
@@ -197,7 +216,7 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getFloat(index));
 	}
 
@@ -209,7 +228,7 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getInteger(index));
 	}
 
@@ -221,7 +240,7 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getLong(index));
 	}
 
@@ -233,7 +252,7 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getShort(index));
 	}
 
@@ -247,103 +266,394 @@ public class StormTupleTest extends AbstractTest {
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
 		flinkTuple.setField(data, index);
 
-		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null);
 		Assert.assertEquals(flinkTuple.getField(index), tuple.getString(index));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testContains() {
-		new StormTuple<Object>(null).contains(null);
+	@Test
+	public void testContains() throws Exception {
+		Fields schema = new Fields("a1", "a2");
+		StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+				schema);
+
+		Assert.assertTrue(tuple.contains("a1"));
+		Assert.assertTrue(tuple.contains("a2"));
+		Assert.assertFalse(tuple.contains("a3"));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetFields() {
-		new StormTuple<Object>(null).getFields();
+	@Test
+	public void testGetFields() throws Exception {
+		Fields schema = new Fields();
+
+		Assert.assertSame(schema, new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+				schema).getFields());
+
+		Assert.assertSame(null, new StormTuple<Object>(null, schema).getFields());
+
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testFieldIndex() {
-		new StormTuple<Object>(null).fieldIndex(null);
+	@Test
+	public void testFieldIndex() throws Exception {
+		Fields schema = new Fields("a1", "a2");
+		StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+				schema);
+
+		Assert.assertEquals(0, tuple.fieldIndex("a1"));
+		Assert.assertEquals(1, tuple.fieldIndex("a2"));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testSelect() {
-		new StormTuple<Object>(null).select(null);
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testSelect() throws Exception {
+		Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+		Values values = new Values();
+
+		ArrayList<String> attributeNames = new ArrayList<String>(arity);
+		ArrayList<String> filterNames = new ArrayList<String>(arity);
+
+		for (int i = 0; i < arity; ++i) {
+			tuple.setField(i, i);
+			attributeNames.add("a" + i);
+
+			if (r.nextBoolean()) {
+				filterNames.add("a" + i);
+				values.add(i);
+			}
+		}
+		Fields schema = new Fields(attributeNames);
+		Fields selector = new Fields(filterNames);
+
+		Assert.assertEquals(values, new StormTuple(tuple, schema).select(selector));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetValueByField() {
-		new StormTuple<Object>(null).getValueByField(null);
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetValueByField() throws Exception {
+		Object value = mock(Object.class);
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getValueByField(fieldName));
+
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetStringByField() {
-		new StormTuple<Object>(null).getStringByField(null);
+	@Test
+	public void testGetValueByFieldPojo() throws Exception {
+		Object value = mock(Object.class);
+		TestPojoMember<Object> pojo = new TestPojoMember<Object>(value);
+		StormTuple<TestPojoMember<Object>> tuple = new StormTuple<TestPojoMember<Object>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetIntegerByField() {
-		new StormTuple<Object>(null).getIntegerByField(null);
+	@Test
+	public void testGetValueByFieldPojoGetter() throws Exception {
+		Object value = mock(Object.class);
+		TestPojoGetter<Object> pojo = new TestPojoGetter<Object>(value);
+		StormTuple<TestPojoGetter<Object>> tuple = new StormTuple<TestPojoGetter<Object>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetLongByField() {
-		new StormTuple<Object>(null).getLongByField(null);
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetStringByField() throws Exception {
+		String value = "stringValue";
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getStringByField(fieldName));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetBooleanByField() {
-		new StormTuple<Object>(null).getBooleanByField(null);
+	@Test
+	public void testGetStringByFieldPojo() throws Exception {
+		String value = "stringValue";
+		TestPojoMember<String> pojo = new TestPojoMember<String>(value);
+		StormTuple<TestPojoMember<String>> tuple = new StormTuple<TestPojoMember<String>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetShortByField() {
-		new StormTuple<Object>(null).getShortByField(null);
+	@Test
+	public void testGetStringByFieldPojoGetter() throws Exception {
+		String value = "stringValue";
+		TestPojoGetter<String> pojo = new TestPojoGetter<String>(value);
+		StormTuple<TestPojoGetter<String>> tuple = new StormTuple<TestPojoGetter<String>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetByteByField() {
-		new StormTuple<Object>(null).getByteByField(null);
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetIntegerByField() throws Exception {
+		Integer value = r.nextInt();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getIntegerByField(fieldName));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetDoubleByField() {
-		new StormTuple<Object>(null).getDoubleByField(null);
+	@Test
+	public void testGetIntegerByFieldPojo() throws Exception {
+		Integer value = r.nextInt();
+		TestPojoMember<Integer> pojo = new TestPojoMember<Integer>(value);
+		StormTuple<TestPojoMember<Integer>> tuple = new StormTuple<TestPojoMember<Integer>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetFloatByField() {
-		new StormTuple<Object>(null).getFloatByField(null);
+	@Test
+	public void testGetIntegerByFieldPojoGetter() throws Exception {
+		Integer value = r.nextInt();
+		TestPojoGetter<Integer> pojo = new TestPojoGetter<Integer>(value);
+		StormTuple<TestPojoGetter<Integer>> tuple = new StormTuple<TestPojoGetter<Integer>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo));
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetBinaryByField() {
-		new StormTuple<Object>(null).getBinaryByField(null);
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetLongByField() throws Exception {
+		Long value = r.nextLong();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getLongByField(fieldName));
+	}
+
+	@Test
+	public void testGetLongByFieldPojo() throws Exception {
+		Long value = r.nextLong();
+		TestPojoMember<Long> pojo = new TestPojoMember<Long>(value);
+		StormTuple<TestPojoMember<Long>> tuple = new StormTuple<TestPojoMember<Long>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetLongByFieldPojoGetter() throws Exception {
+		Long value = r.nextLong();
+		TestPojoGetter<Long> pojo = new TestPojoGetter<Long>(value);
+		StormTuple<TestPojoGetter<Long>> tuple = new StormTuple<TestPojoGetter<Long>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetBooleanByField() throws Exception {
+		Boolean value = r.nextBoolean();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertEquals(value, tuple.getBooleanByField(fieldName));
+	}
+
+	@Test
+	public void testGetBooleanByFieldPojo() throws Exception {
+		Boolean value = r.nextBoolean();
+		TestPojoMember<Boolean> pojo = new TestPojoMember<Boolean>(value);
+		StormTuple<TestPojoMember<Boolean>> tuple = new StormTuple<TestPojoMember<Boolean>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetBooleanByFieldPojoGetter() throws Exception {
+		Boolean value = r.nextBoolean();
+		TestPojoGetter<Boolean> pojo = new TestPojoGetter<Boolean>(value);
+		StormTuple<TestPojoGetter<Boolean>> tuple = new StormTuple<TestPojoGetter<Boolean>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetShortByField() throws Exception {
+		Short value = (short) r.nextInt();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getShortByField(fieldName));
+	}
+
+	@Test
+	public void testGetShortByFieldPojo() throws Exception {
+		Short value = (short) r.nextInt();
+		TestPojoMember<Short> pojo = new TestPojoMember<Short>(value);
+		StormTuple<TestPojoMember<Short>> tuple = new StormTuple<TestPojoMember<Short>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetShortByFieldPojoGetter() throws Exception {
+		Short value = (short) r.nextInt();
+		TestPojoGetter<Short> pojo = new TestPojoGetter<Short>(value);
+		StormTuple<TestPojoGetter<Short>> tuple = new StormTuple<TestPojoGetter<Short>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetByteByField() throws Exception {
+		Byte value = new Byte((byte) r.nextInt());
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getByteByField(fieldName));
+	}
+
+	@Test
+	public void testGetByteByFieldPojo() throws Exception {
+		Byte value = new Byte((byte) r.nextInt());
+		TestPojoMember<Byte> pojo = new TestPojoMember<Byte>(value);
+		StormTuple<TestPojoMember<Byte>> tuple = new StormTuple<TestPojoMember<Byte>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetByteByFieldPojoGetter() throws Exception {
+		Byte value = new Byte((byte) r.nextInt());
+		TestPojoGetter<Byte> pojo = new TestPojoGetter<Byte>(value);
+		StormTuple<TestPojoGetter<Byte>> tuple = new StormTuple<TestPojoGetter<Byte>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetDoubleByField() throws Exception {
+		Double value = r.nextDouble();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getDoubleByField(fieldName));
+	}
+
+	@Test
+	public void testGetDoubleByFieldPojo() throws Exception {
+		Double value = r.nextDouble();
+		TestPojoMember<Double> pojo = new TestPojoMember<Double>(value);
+		StormTuple<TestPojoMember<Double>> tuple = new StormTuple<TestPojoMember<Double>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetDoubleByFieldPojoGetter() throws Exception {
+		Double value = r.nextDouble();
+		TestPojoGetter<Double> pojo = new TestPojoGetter<Double>(value);
+		StormTuple<TestPojoGetter<Double>> tuple = new StormTuple<TestPojoGetter<Double>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetFloatByField() throws Exception {
+		Float value = r.nextFloat();
+		StormTuple tuple = testGetByField(arity, index, value);
+		Assert.assertSame(value, tuple.getFloatByField(fieldName));
+	}
+
+	@Test
+	public void testGetFloatByFieldPojo() throws Exception {
+		Float value = r.nextFloat();
+		TestPojoMember<Float> pojo = new TestPojoMember<Float>(value);
+		StormTuple<TestPojoMember<Float>> tuple = new StormTuple<TestPojoMember<Float>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetFloatByFieldPojoGetter() throws Exception {
+		Float value = r.nextFloat();
+		TestPojoGetter<Float> pojo = new TestPojoGetter<Float>(value);
+		StormTuple<TestPojoGetter<Float>> tuple = new StormTuple<TestPojoGetter<Float>>(pojo,
+				null);
+		Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testGetBinaryByField() throws Exception {
+		byte[] data = new byte[1 + r.nextInt(20)];
+		r.nextBytes(data);
+		StormTuple tuple = testGetByField(arity, index, data);
+		Assert.assertSame(data, tuple.getBinaryByField(fieldName));
+	}
+
+	@Test
+	public void testGetBinaryFieldPojo() throws Exception {
+		byte[] data = new byte[1 + r.nextInt(20)];
+		r.nextBytes(data);
+		TestPojoMember<byte[]> pojo = new TestPojoMember<byte[]>(data);
+		StormTuple<TestPojoMember<byte[]>> tuple = new StormTuple<TestPojoMember<byte[]>>(pojo,
+				null);
+		Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+	}
+
+	@Test
+	public void testGetBinaryByFieldPojoGetter() throws Exception {
+		byte[] data = new byte[1 + r.nextInt(20)];
+		r.nextBytes(data);
+		TestPojoGetter<byte[]> pojo = new TestPojoGetter<byte[]>(data);
+		StormTuple<TestPojoGetter<byte[]>> tuple = new StormTuple<TestPojoGetter<byte[]>>(pojo,
+				null);
+		Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	private <T> StormTuple testGetByField(int arity, int index, T value)
+			throws Exception {
+
+		assert (index < arity);
+
+		Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+		tuple.setField(value, index);
+
+		ArrayList<String> attributeNames = new ArrayList<String>(arity);
+		for(int i = 0; i < arity; ++i) {
+			if(i == index) {
+				attributeNames.add(fieldName);
+			} else {
+				attributeNames.add("" + i);
+			}
+		}
+		Fields schema = new Fields(attributeNames);
+
+		return new StormTuple(tuple, schema);
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
 	public void testGetSourceGlobalStreamid() {
-		new StormTuple<Object>(null).getSourceGlobalStreamid();
+		new StormTuple<Object>(null, null).getSourceGlobalStreamid();
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
 	public void testGetSourceComponent() {
-		new StormTuple<Object>(null).getSourceComponent();
+		new StormTuple<Object>(null, null).getSourceComponent();
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
 	public void testGetSourceTask() {
-		new StormTuple<Object>(null).getSourceTask();
+		new StormTuple<Object>(null, null).getSourceTask();
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
 	public void testGetSourceStreamId() {
-		new StormTuple<Object>(null).getSourceStreamId();
+		new StormTuple<Object>(null, null).getSourceStreamId();
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
 	public void testGetMessageId() {
-		new StormTuple<Object>(null).getMessageId();
+		new StormTuple<Object>(null, null).getMessageId();
+	}
+
+	public static class TestPojoMember<T> {
+		public T member;
+
+		public TestPojoMember(T value) {
+			this.member = value;
+		}
 	}
 
+	public static class TestPojoGetter<T> {
+		private T member;
+
+		public TestPojoGetter(T value) {
+			this.member = value;
+		}
+
+		public T getMember() {
+			return this.member;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
index a4d8885..c5e501b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md
@@ -1,12 +1,16 @@
 # flink-storm-examples
 
-This module contains three versions of a simple word-count-example to illustrate the usage of the compatibility layer:
+This module contains multiple versions of a simple word-count-example to illustrate the usage of the compatibility layer:
 * the usage of spouts or bolt within a regular Flink streaming program (ie, embedded spouts or bolts)
    1. `SpoutSourceWordCount` uses a spout as data source within a Flink streaming program
    2. `BoltTokenizeerWordCount` uses a bolt to split sentences into words within a Flink streaming program
+      * `BoltTokenizeerWordCountWithNames` used Tuple input type and access attributes by field names (rather than index)
+      * `BoltTokenizeerWordCountPOJO` used POJO input type and access attributes by field names (rather then index)
+
 * how to submit a whole Storm topology to Flink
    3. `WordCountTopology` plugs a Storm topology together
       * `StormWordCountLocal` submits the topology to a local Flink cluster (similiar to a `LocalCluster` in Storm)
+        (`StormWordCountNamedLocal` access attributes by field names rather than index)
       * `StormWordCountRemoteByClient` submits the topology to a remote Flink cluster (simliar to the usage of `NimbusClient` in Storm)
       * `StormWordCountRemoteBySubmitter` submits the topology to a remote Flink cluster (simliar to the usage of `StormSubmitter` in Storm)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
index c38b599..f52a7bd 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
@@ -30,7 +30,7 @@ import java.util.Map;
 /**
  * Implements a Storm Spout that reads data from a given local file.
  */
-public final class StormFileSpout extends AbstractStormSpout {
+public class StormFileSpout extends AbstractStormSpout {
 	private static final long serialVersionUID = -6996907090003590436L;
 
 	private final String path;

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
index 3e6081c..99ef324 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
@@ -23,7 +23,7 @@ import org.apache.flink.examples.java.wordcount.util.WordCountData;
 /**
  * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
  */
-public final class StormInMemorySpout extends AbstractStormSpout {
+public class StormInMemorySpout extends AbstractStormSpout {
 	private static final long serialVersionUID = -4008858647468647019L;
 
 	private String[] source;

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java
new file mode 100644
index 0000000..1fc4023
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountFileSpout.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Implements a Storm Spout that reads data from a given local file.
+ */
+public final class StormWordCountFileSpout extends StormFileSpout {
+	private static final long serialVersionUID = 2372251989250954503L;
+
+	public StormWordCountFileSpout(String path) {
+		super(path);
+		// TODO Auto-generated constructor stub
+	}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("sentence"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java
new file mode 100644
index 0000000..408cbfb
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormWordCountInMemorySpout.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
+ */
+public final class StormWordCountInMemorySpout extends StormInMemorySpout {
+	private static final long serialVersionUID = 8832143302409465843L;
+
+	public StormWordCountInMemorySpout(String[] source) {
+		super(source);
+	}
+
+	@Override
+	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("sentence"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
index 606a3ce..8f4503f 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
@@ -67,8 +67,8 @@ public class BoltTokenizerWordCount {
 				.transform("StormBoltTokenizer",
 						TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
 						new StormBoltWrapper<String, Tuple2<String, Integer>>(new StormBoltTokenizer()))
-						// split up the lines in pairs (2-tuples) containing: (word,1)
-						// group by the tuple field "0" and sum up tuple field "1"
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				// group by the tuple field "0" and sum up tuple field "1"
 				.groupBy(0).sum(1);
 
 		// emit result

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
new file mode 100644
index 0000000..befb18f
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataPojos;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataPojos.Sentence;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. In contrast to
+ * {@link BoltTokenizerWordCount} the tokenizer's input is a POJO type and the single field is accessed by name.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to access attributes by name for POJO type input streams
+ * </ul>
+ */
+public class BoltTokenizerWordCountPojo {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<Sentence> text = getTextDataStream(env);
+
+		final DataStream<Tuple2<String, Integer>> counts = text
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				// this is done by a Storm bolt that is wrapped accordingly
+				.transform("StormBoltTokenizer",
+						TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+						new StormBoltWrapper<Sentence, Tuple2<String, Integer>>(
+								new StormBoltTokenizerByName()))
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0).sum(1);
+
+		// emit result
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount with Storm bolt tokenizer");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing BoltTokenizerWordCount example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: BoltTokenizerWordCount <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<Sentence> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			TypeInformation<Sentence> sourceType = TypeExtractor
+					.getForObject(new Sentence(""));
+			return env.createInput(new CsvInputFormat<Sentence>(new Path(
+					textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
+					CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
+					sourceType);
+		}
+
+		return env.fromElements(WordCountDataPojos.SENTENCES);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
new file mode 100644
index 0000000..8483f48
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
+import org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataTuple;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. In contrast to
+ * {@link BoltTokenizerWordCount} the tokenizer's input is a {@link Tuple} type and the single field is accessed by
+ * name.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to access attributes by name for {@link Tuple} type input streams
+ * </ul>
+ */
+public class BoltTokenizerWordCountWithNames {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		final DataStream<Tuple1<String>> text = getTextDataStream(env);
+
+		final DataStream<Tuple2<String, Integer>> counts = text
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				// this is done by a Storm bolt that is wrapped accordingly
+				.transform("StormBoltTokenizer",
+						TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+						new StormBoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(
+								new StormBoltTokenizerByName(), new Fields("sentence")))
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0).sum(1);
+
+		// emit result
+		if (fileOutput) {
+			counts.writeAsText(outputPath);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("Streaming WordCount with Storm bolt tokenizer");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: BoltTokenizerWordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing BoltTokenizerWordCount example with built-in default data");
+			System.out.println("  Provide parameters to read input data from a file");
+			System.out.println("  Usage: BoltTokenizerWordCount <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<Tuple1<String>> getTextDataStream(final StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			TypeInformation<Tuple1<String>> sourceType = TypeExtractor
+					.getForObject(new Tuple1<String>(""));
+			return env.createInput(new CsvInputFormat<Tuple1<String>>(new Path(
+					textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER,
+					CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType),
+					sourceType);
+		}
+
+		return env.fromElements(WordCountDataTuple.TUPLES);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
index 0ae51c6..361d83a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -67,8 +67,8 @@ public class SpoutSourceWordCount {
 		final DataStream<Tuple2<String, Integer>> counts =
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				text.flatMap(new Tokenizer())
-						// group by the tuple field "0" and sum up tuple field "1"
-						.groupBy(0).sum(1);
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0).sum(1);
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/03320503/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
new file mode 100644
index 0000000..f51afab
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wordcount;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.Utils;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and submitted to Flink for execution in the
+ * same way as to a Storm {@link LocalCluster}. In contrast to {@link StormWordCountLocal} all bolts access the field of
+ * input tuples by name instead of index.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot be used to submit a {@link StormTopology}
+ * via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink
+ * </ul>
+ */
+public class StormWordCountNamedLocal {
+	public final static String topologyId = "Streaming WordCountName";
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!WordCountTopology.parseParameters(args)) {
+			return;
+		}
+
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = WordCountTopology.buildTopology(false);
+
+		// execute program locally
+		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+		cluster.submitTopology(topologyId, null, builder.createTopology());
+
+		Utils.sleep(10 * 1000);
+
+		// TODO kill does no do anything so far
+		cluster.killTopology(topologyId);
+		cluster.shutdown();
+	}
+
+}