You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/21 19:36:04 UTC

[1/4] flink git commit: [FLINK-2306] Add support for named streams in Storm compatibility layer - enabled .declareStream() and connect via stream name - enabled multiplt output streams - added .split() / .select() / strip pattern - added helpers in n

Repository: flink
Updated Branches:
  refs/heads/master c9cfb17cb -> 3a8302998


http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java
new file mode 100644
index 0000000..5853705
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/VerifyAndEnrichBolt.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split.stormoperators;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class VerifyAndEnrichBolt extends BaseRichBolt {
+	private static final long serialVersionUID = -7277395570966328721L;
+
+	private final boolean evenOrOdd; // true: even -- false: odd
+	private final String token;
+	private OutputCollector collector;
+
+	public VerifyAndEnrichBolt(boolean evenOrOdd) {
+		this.evenOrOdd = evenOrOdd;
+		this.token = evenOrOdd ? "even" : "odd";
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void execute(Tuple input) {
+		if ((input.getInteger(0) % 2 == 0) != this.evenOrOdd) {
+			throw new RuntimeException("Invalid number detected.");
+		}
+		this.collector.emit(new Values(this.token, input.getInteger(0)));
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("evenOrOdd", "number"));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
index 4c012d8..0f04fea 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
@@ -18,6 +18,8 @@
 package org.apache.flink.stormcompatibility.wordcount;
 
 import backtype.storm.topology.IRichSpout;
+import backtype.storm.utils.Utils;
+
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -140,11 +142,14 @@ public class SpoutSourceWordCount {
 			final String[] tokens = textPath.split(":");
 			final String localFile = tokens[tokens.length - 1];
 			return env.addSource(
-					new StormFiniteSpoutWrapper<String>(new StormFileSpout(localFile), true),
+					new StormFiniteSpoutWrapper<String>(new StormFileSpout(localFile),
+							new String[] { Utils.DEFAULT_STREAM_ID }),
 					TypeExtractor.getForClass(String.class)).setParallelism(1);
 		}
 
-		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
+		return env.addSource(
+				new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS),
+						new String[] { Utils.DEFAULT_STREAM_ID }),
 				TypeExtractor.getForClass(String.class)).setParallelism(1);
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/BoltSplitITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/BoltSplitITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/BoltSplitITCase.java
new file mode 100644
index 0000000..305245b
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/BoltSplitITCase.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split;
+
+import org.junit.Test;
+
+public class BoltSplitITCase {
+
+	@Test
+	public void testTopology() throws Exception {
+		StormSplitStreamBoltLocal.main(new String[] { "0", "/dev/null" });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBolt.java
new file mode 100644
index 0000000..c40e054
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBolt.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class SplitBolt extends BaseRichBolt {
+	private static final long serialVersionUID = -6627606934204267173L;
+
+	public static final String EVEN_STREAM = "even";
+	public static final String ODD_STREAM = "odd";
+
+	private OutputCollector collector;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+		this.collector = collector;
+
+	}
+
+	@Override
+	public void execute(Tuple input) {
+		if (input.getInteger(0) % 2 == 0) {
+			this.collector.emit(EVEN_STREAM, new Values(input.getInteger(0)));
+		} else {
+			this.collector.emit(ODD_STREAM, new Values(input.getInteger(0)));
+		}
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		Fields schema = new Fields("number");
+		declarer.declareStream(EVEN_STREAM, schema);
+		declarer.declareStream(ODD_STREAM, schema);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
new file mode 100644
index 0000000..c992b6b
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split;
+
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.stormcompatibility.split.stormoperators.RandomSpout;
+import org.apache.flink.stormcompatibility.split.stormoperators.VerifyAndEnrichBolt;
+import org.apache.flink.stormcompatibility.util.OutputFormatter;
+import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
+import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
+import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
+
+public class SplitBoltTopology {
+	public final static String spoutId = "randomSource";
+	public final static String boltId = "splitBolt";
+	public final static String evenVerifierId = "evenVerifier";
+	public final static String oddVerifierId = "oddVerifier";
+	public final static String sinkId = "sink";
+	private final static OutputFormatter formatter = new TupleOutputFormatter();
+
+	public static FlinkTopologyBuilder buildTopology() {
+		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+		builder.setSpout(spoutId, new RandomSpout(false, seed));
+		builder.setBolt(boltId, new SplitBolt()).shuffleGrouping(spoutId);
+		builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(boltId,
+				SplitBolt.EVEN_STREAM);
+		builder.setBolt(oddVerifierId, new VerifyAndEnrichBolt(false)).shuffleGrouping(boltId,
+				SplitBolt.ODD_STREAM);
+
+		// emit result
+		if (outputPath != null) {
+			// read the text file from given input path
+			final String[] tokens = outputPath.split(":");
+			final String outputFile = tokens[tokens.length - 1];
+			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
+			.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+		} else {
+			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
+			.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+		}
+
+		return builder;
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static long seed = System.currentTimeMillis();
+	private static String outputPath = null;
+
+	static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 2) {
+				seed = Long.parseLong(args[0]);
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: SplitBoltTopology <seed> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing SplitBoltTopology example with random data");
+			System.out.println("  Usage: SplitBoltTopology <seed> <result path>");
+		}
+
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitSpoutTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitSpoutTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitSpoutTopology.java
new file mode 100644
index 0000000..613fd10
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitSpoutTopology.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split;
+
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.stormcompatibility.split.stormoperators.RandomSpout;
+import org.apache.flink.stormcompatibility.split.stormoperators.VerifyAndEnrichBolt;
+import org.apache.flink.stormcompatibility.util.OutputFormatter;
+import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
+import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
+import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
+
+public class SplitSpoutTopology {
+	public final static String spoutId = "randomSplitSource";
+	public final static String evenVerifierId = "evenVerifier";
+	public final static String oddVerifierId = "oddVerifier";
+	public final static String sinkId = "sink";
+	private final static OutputFormatter formatter = new TupleOutputFormatter();
+
+	public static FlinkTopologyBuilder buildTopology() {
+		final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+		builder.setSpout(spoutId, new RandomSpout(true, seed));
+		builder.setBolt(evenVerifierId, new VerifyAndEnrichBolt(true)).shuffleGrouping(spoutId,
+				RandomSpout.EVEN_STREAM);
+		builder.setBolt(oddVerifierId, new VerifyAndEnrichBolt(false)).shuffleGrouping(spoutId,
+				RandomSpout.ODD_STREAM);
+
+		// emit result
+		if (outputPath != null) {
+			// read the text file from given input path
+			final String[] tokens = outputPath.split(":");
+			final String outputFile = tokens[tokens.length - 1];
+			builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter))
+			.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+		} else {
+			builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4)
+			.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+		}
+
+		return builder;
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static long seed = System.currentTimeMillis();
+	private static String outputPath = null;
+
+	static boolean parseParameters(final String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 2) {
+				seed = Long.parseLong(args[0]);
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: SplitSpoutTopology <seed> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing SplitSpoutTopology example with random data");
+			System.out.println("  Usage: SplitSpoutTopology <seed> <result path>");
+		}
+
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SpoutSplitITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SpoutSplitITCase.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SpoutSplitITCase.java
new file mode 100644
index 0000000..f30e160
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SpoutSplitITCase.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split;
+
+import org.junit.Test;
+
+public class SpoutSplitITCase {
+
+	@Test
+	public void testTopology() throws Exception {
+		StormSplitStreamSpoutLocal.main(new String[] { "0", "/dev/null" });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamBoltLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamBoltLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamBoltLocal.java
new file mode 100644
index 0000000..028f6d1
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamBoltLocal.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split;
+
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+import backtype.storm.utils.Utils;
+
+public class StormSplitStreamBoltLocal {
+	public final static String topologyId = "Bolt split stream example";
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!SplitBoltTopology.parseParameters(args)) {
+			return;
+		}
+
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = SplitBoltTopology.buildTopology();
+
+		// execute program locally
+		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+		cluster.submitTopology(topologyId, null, builder.createTopology());
+
+		Utils.sleep(5 * 1000);
+
+		// TODO kill does no do anything so far
+		cluster.killTopology(topologyId);
+		cluster.shutdown();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamSpoutLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamSpoutLocal.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamSpoutLocal.java
new file mode 100644
index 0000000..cc5acd9
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/StormSplitStreamSpoutLocal.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split;
+
+import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+
+import backtype.storm.utils.Utils;
+
+public class StormSplitStreamSpoutLocal {
+	public final static String topologyId = "Spout split stream example";
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		if (!SplitSpoutTopology.parseParameters(args)) {
+			return;
+		}
+
+		// build Topology the Storm way
+		final FlinkTopologyBuilder builder = SplitSpoutTopology.buildTopology();
+
+		// execute program locally
+		final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
+		cluster.submitTopology(topologyId, null, builder.createTopology());
+
+		Utils.sleep(5 * 1000);
+
+		// TODO kill does no do anything so far
+		cluster.killTopology(topologyId);
+		cluster.shutdown();
+	}
+
+}


[3/4] flink git commit: [FLINK-2306] Add support for named streams in Storm compatibility layer - enabled .declareStream() and connect via stream name - enabled multiplt output streams - added .split() / .select() / strip pattern - added helpers in n

Posted by se...@apache.org.
[FLINK-2306] Add support for named streams in Storm compatibility layer
 - enabled .declareStream() and connect via stream name
 - enabled multiplt output streams
 - added .split() / .select() / strip pattern
 - added helpers in new package utils
 - adapted and extended JUnit tests
 - adapted examples
some minor improvements (FlinkClient, integration of Tuple0)

This closes #1011


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a830299
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a830299
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a830299

Branch: refs/heads/master
Commit: 3a8302998d2fc7f38504c238916bc7d0dada2320
Parents: a82bd43
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Thu Aug 13 08:56:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 21 19:35:39 2015 +0200

----------------------------------------------------------------------
 docs/apis/storm_compatibility.md                |  40 +++-
 .../flink-storm-compatibility-core/README.md    |   1 -
 .../stormcompatibility/api/FlinkClient.java     |  24 ++-
 .../api/FlinkOutputFieldsDeclarer.java          |  70 +++----
 .../stormcompatibility/api/FlinkSubmitter.java  |   2 -
 .../api/FlinkTopologyBuilder.java               | 182 +++++++++++++------
 .../util/FlinkStormStreamSelector.java          |  48 +++++
 .../util/SplitStreamMapper.java                 |  39 ++++
 .../util/SplitStreamType.java                   |  52 ++++++
 .../wrappers/AbstractStormCollector.java        | 118 +++++++-----
 .../wrappers/AbstractStormSpoutWrapper.java     |  41 +++--
 .../wrappers/FiniteStormSpoutWrapper.java       |  85 +++++----
 .../wrappers/StormBoltCollector.java            |  21 ++-
 .../wrappers/StormBoltWrapper.java              | 136 ++++++++++----
 .../wrappers/StormFiniteSpoutWrapper.java       | 138 +++++++++-----
 .../wrappers/StormOutputFieldsDeclarer.java     |  31 +---
 .../wrappers/StormSpoutCollector.java           |  24 +--
 .../wrappers/StormSpoutWrapper.java             |  68 +++++--
 .../wrappers/StormWrapperSetupHelper.java       |  66 ++++---
 .../api/FlinkOutputFieldsDeclarerTest.java      | 113 +++++++-----
 .../api/FlinkTopologyBuilderTest.java           |  48 +++++
 .../flink/stormcompatibility/api/TestBolt.java  |  48 +++++
 .../flink/stormcompatibility/api/TestSpout.java |  59 ++++++
 .../wrappers/FlinkStormStreamSelectorTest.java  |  51 ++++++
 .../wrappers/StormBoltCollectorTest.java        |  26 +--
 .../wrappers/StormBoltWrapperTest.java          | 118 ++++++++++--
 .../wrappers/StormOutputFieldsDeclarerTest.java |  37 ++--
 .../wrappers/StormSpoutCollectorTest.java       |  22 ++-
 .../wrappers/StormTupleTest.java                |   2 +-
 .../wrappers/StormWrapperSetupHelperTest.java   |  47 +++--
 .../flink-storm-compatibility-examples/pom.xml  |   3 +
 .../excamation/ExclamationWithStormBolt.java    |   7 +-
 .../excamation/ExclamationWithStormSpout.java   |  12 +-
 .../split/SpoutSplitExample.java                | 102 +++++++++++
 .../split/stormoperators/RandomSpout.java       |  76 ++++++++
 .../stormoperators/VerifyAndEnrichBolt.java     |  61 +++++++
 .../wordcount/SpoutSourceWordCount.java         |   9 +-
 .../split/BoltSplitITCase.java                  |  28 +++
 .../stormcompatibility/split/SplitBolt.java     |  61 +++++++
 .../split/SplitBoltTopology.java                |  87 +++++++++
 .../split/SplitSpoutTopology.java               |  85 +++++++++
 .../split/SpoutSplitITCase.java                 |  28 +++
 .../split/StormSplitStreamBoltLocal.java        |  51 ++++++
 .../split/StormSplitStreamSpoutLocal.java       |  51 ++++++
 44 files changed, 1918 insertions(+), 500 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index b38667b..3a0c025 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -49,7 +49,9 @@ Add the following dependency to your `pom.xml` if you want to execute Storm code
 </dependency>
 ~~~
 
-**Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink distribution. Thus, you need to include `flink-storm-compatiblitly-core` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
+**Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink distribution.
+Thus, you need to include `flink-storm-compatiblitly-core` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.
+See *WordCount Storm* within `flink-storm-compatibility-example/pom.xml` for an example how to package a jar correctly.
 
 # Execute Storm Topologies
 
@@ -93,7 +95,7 @@ if(runLocal) { // submit to test cluster
 As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
 The Storm compatibility layer offers a wrapper classes for each, namely `StormSpoutWrapper` and `StormBoltWrapper` (`org.apache.flink.stormcompatibility.wrappers`).
 
-Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple1` to `Tuple25` according to the number of fields of the Storm tuples).
+Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple0` to `Tuple25` according to the number of fields of the Storm tuples).
 For single field output tuples a conversion to the field's data type is also possible (eg, `String` instead of `Tuple1<String>`).
 
 Because Flink cannot infer the output field types of Storm operators, it is required to specify the output type manually.
@@ -112,7 +114,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
 
 // stream has `raw` type (single field output streams only)
 DataStream<String> rawInput = env.addSource(
-	new StormSpoutWrapper<String>(new StormFileSpout(localFilePath), true), // Spout source, 'true' for raw type
+	new StormSpoutWrapper<String>(new StormFileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
 	TypeExtractor.getForClass(String.class)); // output type
 
 // process data stream
@@ -167,6 +169,38 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.
 
 See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.  
 
+## Multiple Output Streams
+
+Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
+If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.
+For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitDataStream.select(...)`.
+Flink provides the predefined output selector `FlinkStormStreamSelector<T>` for `.split(...)` already.
+Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
+If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, `SplitStreamTuple<T>` must **not** be removed &ndash; `StormBoltWrapper` removes it automatically.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+[...]
+
+// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
+DataStream<SplitStreamType<SomeType>> multiStream = ...
+
+SplitDataStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new FlinkStormStreamSelector<SomeType>());
+
+// remove SplitStreamMapper to get data stream of type SomeType
+DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>).returns(SomeType.classs);
+// apply Bolt directly, without stripping SplitStreamMapper
+DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt for further processing */);
+
+// do further processing on s1 and s2
+[...]
+~~~
+</div>
+</div>
+
+See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java) for a full example.
+
 # Flink Extensions
 
 ## Finite Storm Spouts

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
index 04d8934..aef4847 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
@@ -5,7 +5,6 @@ The Storm compatibility layer allows to embed spouts or bolt unmodified within a
 The following Strom features are not (yet/fully) supported by the compatibility layer right now:
 * the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
 * topology and tuple meta information (ie, `TopologyContext` not fully supported)
-* only default stream is supported currently (ie, only a single output stream)
 * no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
 * for whole Storm topologies the following is not supported by Flink:
   * direct emit connection pattern

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 5a6e8ca..51a4fa1 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -30,6 +30,7 @@ import backtype.storm.generated.Nimbus;
 import backtype.storm.generated.NotAliveException;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -64,6 +66,9 @@ import java.util.Map;
  */
 public class FlinkClient {
 
+	/** The client's configuration */
+	@SuppressWarnings("unused")
+	private final Map<?,?> conf;
 	/** The jobmanager's host name */
 	private final String jobManagerHost;
 	/** The jobmanager's rpc port */
@@ -77,19 +82,24 @@ public class FlinkClient {
 	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
 	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
 	 *
+	 * @param conf
+	 * 		A configuration.
 	 * @param host
 	 * 		The jobmanager's host name.
 	 * @param port
 	 * 		The jobmanager's rpc port.
 	 */
-	public FlinkClient(final String host, final int port) {
-		this(host, port, null);
+	@SuppressWarnings("rawtypes")
+	public FlinkClient(final Map conf, final String host, final int port) {
+		this(conf, host, port, null);
 	}
 
 	/**
 	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
 	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
 	 *
+	 * @param conf
+	 * 		A configuration.
 	 * @param host
 	 * 		The jobmanager's host name.
 	 * @param port
@@ -97,7 +107,9 @@ public class FlinkClient {
 	 * @param timeout
 	 * 		Timeout
 	 */
-	public FlinkClient(final String host, final int port, final Integer timeout) {
+	@SuppressWarnings("rawtypes")
+	public FlinkClient(final Map conf, final String host, final int port, final Integer timeout) {
+		this.conf = conf;
 		this.jobManagerHost = host;
 		this.jobManagerPort = port;
 		if (timeout != null) {
@@ -119,7 +131,7 @@ public class FlinkClient {
 	public static FlinkClient getConfiguredClient(final Map conf) {
 		final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
 		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
-		return new FlinkClient(nimbusHost, nimbusPort);
+		return new FlinkClient(conf, nimbusHost, nimbusPort);
 	}
 
 	/**
@@ -133,7 +145,7 @@ public class FlinkClient {
 		return this;
 	}
 
-	public void close() {/* nothing to do */}
+	// The following methods are derived from "backtype.storm.generated.Nimubs.Client"
 
 	/**
 	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
@@ -220,6 +232,8 @@ public class FlinkClient {
 		}
 	}
 
+	// Flink specific additional methods
+
 	/**
 	 * Package internal method to get a Flink {@link JobID} from a Storm topology name.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
index 49d73f8..e2f6332 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
@@ -20,10 +20,12 @@ package org.apache.flink.stormcompatibility.api;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
+import java.util.HashMap;
 import java.util.List;
 
 /**
@@ -36,8 +38,8 @@ import java.util.List;
  */
 final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 
-	/** the declared output schema */
-	Fields outputSchema;
+	/** the declared output streams and schemas */
+	final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
 
 	@Override
 	public void declare(final Fields fields) {
@@ -57,15 +59,6 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
 	}
 
-	/**
-	 * {@inheritDoc}
-	 * <p/>
-	 * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
-	 * {@link Utils#DEFAULT_STREAM_ID}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
-	 */
 	@Override
 	public void declareStream(final String streamId, final Fields fields) {
 		this.declareStream(streamId, false, fields);
@@ -74,40 +67,45 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 	/**
 	 * {@inheritDoc}
 	 * <p/>
-	 * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
-	 * {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
-	 * must be {@code false}.
+	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
 	 *
 	 * @throws UnsupportedOperationException
-	 * 		if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
+	 * 		if {@code direct} is {@code true}
 	 */
 	@Override
 	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
-		if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
-			throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
-		}
 		if (direct) {
 			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
 		}
 
-		this.outputSchema = fields;
+		this.outputStreams.put(streamId, fields);
 	}
 
 	/**
-	 * Returns {@link TypeInformation} for the declared output schema. If no or an empty output schema was declared,
-	 * {@code null} is returned.
-	 *
-	 * @return output type information for the declared output schema; or {@code null} if no output schema was declared
+	 * Returns {@link TypeInformation} for the declared output schema for a specific stream.
+	 * 
+	 * @param streamId
+	 *            A stream ID.
+	 * 
+	 * @return output type information for the declared output schema of the specified stream; or {@code null} if
+	 *         {@code streamId == null}
+	 * 
 	 * @throws IllegalArgumentException
-	 * 		if more then 25 attributes are declared
+	 *             If no output schema was declared for the specified stream or if more then 25 attributes got declared.
 	 */
-	public TypeInformation<?> getOutputType() throws IllegalArgumentException {
-		if ((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
+	public TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
+		if (streamId == null) {
 			return null;
 		}
 
+		Fields outputSchema = this.outputStreams.get(streamId);
+		if (outputSchema == null) {
+			throw new IllegalArgumentException("Stream with ID '" + streamId
+					+ "' was not declared.");
+		}
+
 		Tuple t;
-		final int numberOfAttributes = this.outputSchema.size();
+		final int numberOfAttributes = outputSchema.size();
 
 		if (numberOfAttributes == 1) {
 			return TypeExtractor.getForClass(Object.class);
@@ -148,16 +146,22 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 	}
 
 	/**
-	 * Computes the indexes within the declared output schema, for a list of given field-grouping attributes.
-	 *
-	 * @return array of {@code int}s that contains the index without the output schema for each attribute in the given
-	 * list
+	 * Computes the indexes within the declared output schema of the specified stream, for a list of given
+	 * field-grouping attributes.
+	 * 
+	 * @param streamId
+	 *            A stream ID.
+	 * @param groupingFields
+	 *            The names of the key fields.
+	 * 
+	 * @return array of {@code int}s that contains the index within the output schema for each attribute in the given
+	 *         list
 	 */
-	public int[] getGroupingFieldIndexes(final List<String> groupingFields) {
+	public int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) {
 		final int[] fieldIndexes = new int[groupingFields.size()];
 
 		for (int i = 0; i < fieldIndexes.length; ++i) {
-			fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i));
+			fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
 		}
 
 		return fieldIndexes;

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
index 819dbbc..bcc2afb 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -125,8 +125,6 @@ public class FlinkSubmitter {
 		} catch (final AlreadyAliveException e) {
 			logger.warn("Topology already alive exception", e);
 			throw e;
-		} finally {
-			client.close();
 		}
 
 		logger.info("Finished submitting topology: " + name);

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index d146250..a739c23 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -33,6 +33,9 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
 import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper;
 import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
 import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
@@ -41,6 +44,7 @@ import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -54,8 +58,7 @@ import java.util.Set;
  * topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder}
  * implementation to ensure equal behavior.<br />
  * <br />
- * <strong>CAUTION: {@link IRichStateSpout StateSpout}s and multiple output streams per spout/bolt are currently not
- * supported.</strong>
+ * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
  */
 public class FlinkTopologyBuilder {
 
@@ -65,13 +68,13 @@ public class FlinkTopologyBuilder {
 	private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
 	/** All user bolts by their ID */
 	private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
-	/** All declared output schemas by operator ID */
-	private final HashMap<String, Fields> outputSchemas = new HashMap<String, Fields>();
+	/** All declared streams and output schemas by operator ID */
+	private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
 	/** All spouts&bolts declarers by their ID */
 	private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
 
 	/**
-	 * Creates a Flink program that used the specified spouts and bolts.
+	 * Creates a Flink program that uses the specified spouts and bolts.
 	 */
 	@SuppressWarnings({"rawtypes", "unchecked"})
 	public FlinkTopology createTopology() {
@@ -79,8 +82,7 @@ public class FlinkTopologyBuilder {
 		final FlinkTopology env = new FlinkTopology(stormTopolgoy);
 		env.setParallelism(1);
 
-		final HashMap<String, SingleOutputStreamOperator> availableOperators =
-				new HashMap<String, SingleOutputStreamOperator>();
+		final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
 
 		for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
 			final String spoutId = spout.getKey();
@@ -88,14 +90,10 @@ public class FlinkTopologyBuilder {
 
 			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
 			userSpout.declareOutputFields(declarer);
-			this.outputSchemas.put(spoutId, declarer.outputSchema);
+			final HashMap<String,Fields> sourceStreams = declarer.outputStreams;
+			this.outputStreams.put(spoutId, sourceStreams);
 			declarers.put(spoutId, declarer);
 
-			/* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
-			 * and StormCollector)
-			 * -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
-			 * the streams
-			 */
 			AbstractStormSpoutWrapper spoutWrapper;
 
 			if (userSpout instanceof FiniteStormSpout) {
@@ -104,8 +102,23 @@ public class FlinkTopologyBuilder {
 				spoutWrapper = new StormSpoutWrapper(userSpout);
 			}
 
-			final DataStreamSource source = env.addSource(spoutWrapper, declarer.getOutputType());
-			availableOperators.put(spoutId, source);
+			DataStreamSource source;
+			HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>();
+			if (sourceStreams.size() == 1) {
+				final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
+				source = env.addSource(spoutWrapper, spoutId,
+						declarer.getOutputType(outputStreamId));
+				outputStreams.put(outputStreamId, source);
+			} else {
+				source = env.addSource(spoutWrapper, spoutId,
+						TypeExtractor.getForClass(SplitStreamType.class));
+				SplitDataStream splitSource = source.split(new FlinkStormStreamSelector());
+
+				for (String streamId : sourceStreams.keySet()) {
+					outputStreams.put(streamId, splitSource.select(streamId));
+				}
+			}
+			availableInputs.put(spoutId, outputStreams);
 
 			int dop = 1;
 			final ComponentCommon common = stormTopolgoy.get_spouts().get(spoutId).get_common();
@@ -126,7 +139,14 @@ public class FlinkTopologyBuilder {
 		 * its producer
 		 * ->thus, we might need to repeat multiple times
 		 */
+		boolean makeProgress = true;
 		while (unprocessedBolts.size() > 0) {
+			if (!makeProgress) {
+				throw new RuntimeException(
+						"Unable to build Topology. Could not connect the following bolts: "
+								+ unprocessedBolts.keySet());
+			}
+			makeProgress = false;
 
 			final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
 			while (boltsIterator.hasNext()) {
@@ -135,11 +155,6 @@ public class FlinkTopologyBuilder {
 				final String boltId = bolt.getKey();
 				final IRichBolt userBolt = bolt.getValue();
 
-				final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-				userBolt.declareOutputFields(declarer);
-				this.outputSchemas.put(boltId, declarer.outputSchema);
-				declarers.put(boltId, declarer);
-
 				final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
 
 				Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
@@ -153,51 +168,98 @@ public class FlinkTopologyBuilder {
 				final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
 				while (inputStreamsIterator.hasNext()) {
 
-					final Entry<GlobalStreamId, Grouping> inputStream = inputStreamsIterator.next();
-					final String producerId = inputStream.getKey().get_componentId();
-
-					DataStream<?> inputDataStream = availableOperators.get(producerId);
-
-					if (inputDataStream != null) {
-						// if producer was processed already
-						final Grouping grouping = inputStream.getValue();
-						if (grouping.is_set_shuffle()) {
-							// Storm uses a round-robin shuffle strategy
-							inputDataStream = inputDataStream.rebalance();
-						} else if (grouping.is_set_fields()) {
-							// global grouping is emulated in Storm via an empty fields grouping list
-							final List<String> fields = grouping.get_fields();
-							if (fields.size() > 0) {
-								FlinkOutputFieldsDeclarer procDeclarer = this.declarers.get(producerId);
-								inputDataStream = inputDataStream.groupBy(procDeclarer.getGroupingFieldIndexes(grouping
-										.get_fields()));
-							} else {
-								inputDataStream = inputDataStream.global();
+					final Entry<GlobalStreamId, Grouping> stormInputStream = inputStreamsIterator.next();
+					final String producerId = stormInputStream.getKey().get_componentId();
+					final String inputStreamId = stormInputStream.getKey().get_streamId();
+
+					HashMap<String, DataStream> producer = availableInputs.get(producerId);
+					if (producer != null) {
+						makeProgress = true;
+
+						DataStream inputStream = producer.get(inputStreamId);
+						if (inputStream != null) {
+							final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+							userBolt.declareOutputFields(declarer);
+							final HashMap<String, Fields> boltOutputStreams = declarer.outputStreams;
+							this.outputStreams.put(boltId, boltOutputStreams);
+							this.declarers.put(boltId, declarer);
+
+							// if producer was processed already
+							final Grouping grouping = stormInputStream.getValue();
+							if (grouping.is_set_shuffle()) {
+								// Storm uses a round-robin shuffle strategy
+								inputStream = inputStream.rebalance();
+							} else if (grouping.is_set_fields()) {
+								// global grouping is emulated in Storm via an empty fields grouping list
+								final List<String> fields = grouping.get_fields();
+								if (fields.size() > 0) {
+									FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
+									inputStream = inputStream.groupBy(prodDeclarer
+											.getGroupingFieldIndexes(inputStreamId,
+													grouping.get_fields()));
+								} else {
+									inputStream = inputStream.global();
+								}
+							} else if (grouping.is_set_all()) {
+								inputStream = inputStream.broadcast();
+							} else if (!grouping.is_set_local_or_shuffle()) {
+								throw new UnsupportedOperationException(
+										"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
 							}
-						} else if (grouping.is_set_all()) {
-							inputDataStream = inputDataStream.broadcast();
-						} else if (!grouping.is_set_local_or_shuffle()) {
-							throw new UnsupportedOperationException(
-									"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
-						}
 
-						final TypeInformation<?> outType = declarer.getOutputType();
+							SingleOutputStreamOperator outputStream;
+							if (boltOutputStreams.size() < 2) { // single output stream or sink
+								String outputStreamId = null;
+								if (boltOutputStreams.size() == 1) {
+									outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
+								}
+								final TypeInformation<?> outType = declarer
+										.getOutputType(outputStreamId);
+
+								outputStream = inputStream.transform(
+										boltId,
+										outType,
+										new StormBoltWrapper(userBolt, this.outputStreams.get(
+												producerId).get(inputStreamId)));
+
+								if (outType != null) {
+									// only for non-sink nodes
+									HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+									op.put(outputStreamId, outputStream);
+									availableInputs.put(boltId, op);
+								}
+							} else {
+								final TypeInformation<?> outType = TypeExtractor
+										.getForClass(SplitStreamType.class);
+
+								outputStream = inputStream.transform(
+										boltId,
+										outType,
+										new StormBoltWrapper(userBolt, this.outputStreams.get(
+												producerId).get(inputStreamId)));
+
+								SplitDataStream splitStreams = outputStream
+										.split(new FlinkStormStreamSelector());
+
+								HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+								for (String outputStreamId : boltOutputStreams.keySet()) {
+									op.put(outputStreamId, splitStreams.select(outputStreamId));
+								}
+								availableInputs.put(boltId, op);
+							}
 
-						final SingleOutputStreamOperator operator = inputDataStream.transform(boltId, outType,
-								new StormBoltWrapper(userBolt, this.outputSchemas.get(producerId)));
-						if (outType != null) {
-							// only for non-sink nodes
-							availableOperators.put(boltId, operator);
-						}
+							int dop = 1;
+							if (common.is_set_parallelism_hint()) {
+								dop = common.get_parallelism_hint();
+								outputStream.setParallelism(dop);
+							}
+							env.increaseNumberOfTasks(dop);
 
-						int dop = 1;
-						if (common.is_set_parallelism_hint()) {
-							dop = common.get_parallelism_hint();
-							operator.setParallelism(dop);
+							inputStreamsIterator.remove();
+						} else {
+							throw new RuntimeException("Cannot connect '" + boltId + "' to '"
+									+ producerId + "'. Stream '" + inputStreamId + "' not found.");
 						}
-						env.increaseNumberOfTasks(dop);
-
-						inputStreamsIterator.remove();
 					}
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
new file mode 100644
index 0000000..7ca45d6
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+
+/**
+ * Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink.
+ */
+final public class FlinkStormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
+	private static final long serialVersionUID = 2553423379715401023L;
+
+	/** internal cache to avoid short living ArrayList objects. */
+	private final HashMap<String, List<String>> streams = new HashMap<String, List<String>>();
+
+	@Override
+	public Iterable<String> select(SplitStreamType<T> value) {
+		String sid = value.streamId;
+		List<String> streamId = this.streams.get(sid);
+		if (streamId == null) {
+			streamId = new ArrayList<String>(1);
+			streamId.add(sid);
+			this.streams.put(sid, streamId);
+		}
+		return streamId;
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
new file mode 100644
index 0000000..afcdcae
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+
+/**
+ * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record of type {@code T}. Can be used to get
+ * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and
+ * {@link SplitDataStream#select(String...) .select(...)}).
+ * 
+ * @param <T>
+ */
+public class SplitStreamMapper<T> implements MapFunction<SplitStreamType<T>, T> {
+	private static final long serialVersionUID = 3550359150160908564L;
+
+	@Override
+	public T map(SplitStreamType<T> value) throws Exception {
+		return value.value;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
new file mode 100644
index 0000000..9c7e477
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Used by {@link org.apache.flink.stormcompatibility.wrappers.AbstractStormCollector AbstractStormCollector} to wrap
+ * output tuples if multiple output streams are declared. For this case, the Flink output data stream must be split via
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} using
+ * {@link FlinkStormStreamSelector}.
+ */
+public class SplitStreamType<T> {
+
+	/** The stream ID this tuple belongs to. */
+	public String streamId;
+	/** The actual data value. */
+	public T value;
+
+	@Override
+	public String toString() {
+		return "<sid:" + this.streamId + ",v:" + this.value + ">";
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		SplitStreamType<?> other = (SplitStreamType<?>) o;
+
+		return this.streamId.equals(other.streamId) && this.value.equals(other.value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
index 4a8fb7d..7b35a64 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
@@ -17,82 +17,114 @@
 package org.apache.flink.stormcompatibility.wrappers;
 
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map.Entry;
 
 /**
  * A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples.
  */
 abstract class AbstractStormCollector<OUT> {
 
+	/** Flink output tuple of concrete type {@link Tuple0} to {@link Tuple25} per output stream. */
+	protected final HashMap<String, Tuple> outputTuple = new HashMap<String, Tuple>();
+	/** Flink split tuple. Used, if multiple output streams are declared. */
+	private final SplitStreamType<Object> splitTuple = new SplitStreamType<Object>();
 	/**
-	 * Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}.
-	 */
-	protected final Tuple outputTuple;
-	/**
-	 * The number of attributes of the output tuples. (Determines the concrete type of
-	 * {@link #outputTuple}). If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not
-	 * used and "raw" data type is used.
-	 */
-	protected final int numberOfAttributes;
-	/**
-	 * Is set to {@code true} each time a tuple is emitted.
+	 * The number of attributes of the output tuples per stream. (Determines the concrete type of {@link #outputTuple}).
+	 * If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not used and "raw" data type is used.
 	 */
+	protected final HashMap<String, Integer> numberOfAttributes;
+	/** Indicates of multiple output stream are declared and thus {@link SplitStreamType} must be used as output. */
+	private final boolean split;
+	/** Is set to {@code true} each time a tuple is emitted. */
 	boolean tupleEmitted = false;
 
 	/**
-	 * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via
-	 * {@link #doEmit(Object)}. If the number of attributes is specified as zero, any output type is
-	 * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
-	 * to {@link Tuple25}.
+	 * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via {@link #doEmit(Object)}. If the
+	 * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
+	 * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
 	 * 
 	 * @param numberOfAttributes
-	 * 		The number of attributes of the emitted tuples.
+	 *            The number of attributes of the emitted tuples per output stream.
 	 * @throws UnsupportedOperationException
-	 * 		if the specified number of attributes is not in the valid range of [0,25]
+	 *             if the specified number of attributes is greater than 25
 	 */
-	public AbstractStormCollector(final int numberOfAttributes) throws UnsupportedOperationException {
+	public AbstractStormCollector(final HashMap<String, Integer> numberOfAttributes)
+			throws UnsupportedOperationException {
+		assert (numberOfAttributes != null);
+
 		this.numberOfAttributes = numberOfAttributes;
+		this.split = this.numberOfAttributes.size() > 1;
+
+		for (Entry<String, Integer> outputStream : numberOfAttributes.entrySet()) {
+			final int numAtt = outputStream.getValue();
+			assert (numAtt >= -1);
+
+			if (numAtt > 25) {
+				throw new UnsupportedOperationException(
+						"Flink cannot handle more then 25 attributes, but " + numAtt
+						+ " are declared for stream '" + outputStream.getKey()
+						+ "' by the given bolt");
+			} else if (numAtt >= 0) {
+				try {
+					this.outputTuple.put(outputStream.getKey(),
+							org.apache.flink.api.java.tuple.Tuple.getTupleClass(numAtt)
+							.newInstance());
+				} catch (final InstantiationException e) {
+					throw new RuntimeException(e);
+				} catch (final IllegalAccessException e) {
+					throw new RuntimeException(e);
+				}
 
-		if (this.numberOfAttributes <= 0) {
-			this.outputTuple = null;
-		} else if (this.numberOfAttributes <= 25) {
-			try {
-				this.outputTuple = org.apache.flink.api.java.tuple.Tuple
-						.getTupleClass(this.numberOfAttributes).newInstance();
-			} catch (final InstantiationException e) {
-				throw new RuntimeException(e);
-			} catch (final IllegalAccessException e) {
-				throw new RuntimeException(e);
 			}
-		} else {
-			throw new UnsupportedOperationException(
-					"Flink cannot handle more then 25 attributes, but "
-					+ this.numberOfAttributes + " are declared by the given bolt");
 		}
 	}
 
 	/**
-	 * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via
-	 * {@link #doEmit(Object)}.
+	 * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
+	 * to the specified output stream.
 	 * 
+	 * @param The
+	 *            The output stream id.
 	 * @param tuple
-	 * 		The Storm tuple to be emitted.
+	 *            The Storm tuple to be emitted.
 	 * @return the return value of {@link #doEmit(Object)}
 	 */
 	@SuppressWarnings("unchecked")
-	protected final List<Integer> transformAndEmit(final List<Object> tuple) {
+	protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
 		List<Integer> taskIds;
-		if (this.numberOfAttributes > 0) {
-			assert (tuple.size() == this.numberOfAttributes);
-			for (int i = 0; i < this.numberOfAttributes; ++i) {
-				this.outputTuple.setField(tuple.get(i), i);
+
+		final int numAtt = this.numberOfAttributes.get(streamId);
+		if (numAtt > -1) {
+			assert (tuple.size() == numAtt);
+			Tuple out = this.outputTuple.get(streamId);
+			for (int i = 0; i < numAtt; ++i) {
+				out.setField(tuple.get(i), i);
+			}
+			if (this.split) {
+				this.splitTuple.streamId = streamId;
+				this.splitTuple.value = out;
+
+				taskIds = doEmit((OUT) this.splitTuple);
+			} else {
+				taskIds = doEmit((OUT) out);
 			}
-			taskIds = doEmit((OUT) this.outputTuple);
+
 		} else {
 			assert (tuple.size() == 1);
-			taskIds = doEmit((OUT) tuple.get(0));
+			if (split) {
+				this.splitTuple.streamId = streamId;
+				this.splitTuple.value = tuple.get(0);
+
+				taskIds = doEmit((OUT) this.splitTuple);
+			} else {
+				taskIds = doEmit((OUT) tuple.get(0));
+			}
 		}
 		this.tupleEmitted = true;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index 4e43a8a..62059fe 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -17,8 +17,13 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
+import java.util.Collection;
+import java.util.HashMap;
+
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -37,9 +42,9 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
 	private static final long serialVersionUID = 4993283609095408765L;
 
 	/**
-	 * Number of attributes of the bolt's output tuples.
+	 * Number of attributes of the bolt's output tuples per stream.
 	 */
-	private final int numberOfAttributes;
+	private final HashMap<String, Integer> numberOfAttributes;
 	/**
 	 * The wrapped Storm {@link IRichSpout spout}.
 	 */
@@ -55,38 +60,40 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
 
 	/**
 	 * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
-	 * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple1} to
+	 * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to
 	 * {@link Tuple25} depending on the spout's declared number of attributes.
 	 *
 	 * @param spout
 	 * 		The Storm {@link IRichSpout spout} to be used.
 	 * @throws IllegalArgumentException
-	 * 		If the number of declared output attributes is not with range [1;25].
+	 * 		If the number of declared output attributes is not with range [0;25].
 	 */
 	public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
-		this(spout, false);
+		this(spout, null);
 	}
 
 	/**
 	 * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
 	 * that it can be used within a Flink streaming program. The output type can be any type if parameter
 	 * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
-	 * {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
+	 * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
 	 * number of attributes.
-	 *
+	 * 
 	 * @param spout
-	 * 		The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 * 		of a raw type.
+	 *            The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 * 		[1;25].
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
 	 */
-	public AbstractStormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
+	public AbstractStormSpoutWrapper(final IRichSpout spout,
+			final Collection<String> rawOutputs)
+					throws IllegalArgumentException {
 		this.spout = spout;
-		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutput);
+		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
 	}
 
 	@Override
@@ -94,7 +101,7 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
 		this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
 		this.spout.open(null,
 				StormWrapperSetupHelper
-						.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
+				.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
 				new SpoutOutputCollector(this.collector));
 		this.spout.activate();
 		this.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
index 7913510..1912afc 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
@@ -17,6 +17,14 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
+import java.util.Collection;
+
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+
+import com.google.common.collect.Sets;
+
 /**
  * A {@link FiniteStormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped
  * {@link FiniteStormSpout}'s {@link FiniteStormSpout#nextTuple()} method until {@link
@@ -28,16 +36,14 @@ public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT>
 	private FiniteStormSpout finiteSpout;
 
 	/**
-	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
-	 * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
-	 * output
-	 * type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
-	 * number of attributes.
-	 *
+	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
+	 * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to
+	 * {@link Tuple25} depending on the spout's declared number of attributes.
+	 * 
 	 * @param spout
-	 * 		The Storm {@link FiniteStormSpout spout} to be used. @throws
-	 * 		IllegalArgumentException If
-	 * 		the number of declared output attributes is not with range [1;25].
+	 *            The Storm {@link FiniteStormSpout spout} to be used.
+	 * @throws IllegalArgumentException
+	 *             If the number of declared output attributes is not with range [0;25].
 	 */
 	public FiniteStormSpoutWrapper(FiniteStormSpout spout)
 			throws IllegalArgumentException {
@@ -46,36 +52,53 @@ public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT>
 	}
 
 	/**
-	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link
-	 * FiniteStormSpout spout} such that it can be used within a Flink streaming program. The
-	 * output
-	 * type can be any type if parameter {@code rawOutput} is {@code true} and the spout's
-	 * number of
-	 * declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be
-	 * one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
-	 * attributes.
-	 *
+	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
+	 * that it can be used within a Flink streaming program. The output type can be any type if parameter
+	 * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
+	 * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
+	 * number of attributes.
+	 * 
+	 * @param spout
+	 *            The Storm {@link FiniteStormSpout spout} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
+	 * @throws IllegalArgumentException
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
+	 */
+	public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final String[] rawOutputs)
+			throws IllegalArgumentException {
+		this(spout, Sets.newHashSet(rawOutputs));
+	}
+
+	/**
+	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
+	 * that it can be used within a Flink streaming program. The output type can be any type if parameter
+	 * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
+	 * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
+	 * number of attributes.
+	 * 
 	 * @param spout
-	 * 		The Storm {@link FiniteStormSpout spout} to be used.
-	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link
-	 * 		Tuple1} but be of a raw type.
+	 *            The Storm {@link FiniteStormSpout spout} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is
-	 * 		not 1
-	 * 		or if {@code rawOuput} is {@code false} and the number of declared output attributes
-	 * 		is not
-	 * 		with range [1;25].
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
 	 */
-	public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final boolean rawOutput)
+	public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final Collection<String> rawOutputs)
 			throws IllegalArgumentException {
-		super(spout, rawOutput);
+		super(spout, rawOutputs);
 		this.finiteSpout = spout;
 	}
 
 	/**
-	 * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link
-	 * FiniteStormSpout#reachedEnd()} is true or {@link FiniteStormSpout#cancel()} is called.
+	 * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link FiniteStormSpout#reachedEnd()} is true or
+	 * {@link FiniteStormSpout#cancel()} is called.
 	 */
 	@Override
 	protected void execute() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
index 81ad9a6..e810214 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
@@ -20,12 +20,13 @@ package org.apache.flink.stormcompatibility.wrappers;
 import backtype.storm.task.IOutputCollector;
 import backtype.storm.tuple.Tuple;
 
-import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.util.Collector;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 
 /**
@@ -39,19 +40,19 @@ class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOu
 	private final Collector<OUT> flinkOutput;
 
 	/**
-	 * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink
-	 * output object. If the number of attributes is specified as zero, any output type is
-	 * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
-	 * to {@link Tuple25}.
+	 * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink output object. If the
+	 * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
+	 * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
 	 * 
 	 * @param numberOfAttributes
-	 *        The number of attributes of the emitted tuples.
+	 *            The number of attributes of the emitted tuples per output stream.
 	 * @param flinkOutput
-	 *        The Flink output object to be used.
+	 *            The Flink output object to be used.
 	 * @throws UnsupportedOperationException
-	 *         if the specified number of attributes is not in the valid range of [0,25]
+	 *             if the specified number of attributes is greater than 25
 	 */
-	public StormBoltCollector(final int numberOfAttributes, final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
+	public StormBoltCollector(final HashMap<String, Integer> numberOfAttributes,
+			final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
 		super(numberOfAttributes);
 		assert (flinkOutput != null);
 		this.flinkOutput = flinkOutput;
@@ -72,7 +73,7 @@ class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOu
 
 	@Override
 	public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
-		return this.transformAndEmit(tuple);
+		return this.tansformAndEmit(streamId, tuple);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index 8bcdae0..05a4902 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -16,23 +16,26 @@
  */
 package org.apache.flink.stormcompatibility.wrappers;
 
+import java.util.Collection;
+import java.util.HashMap;
+
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.tuple.Fields;
 
+import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-
-
-
+import com.google.common.collect.Sets;
 
 /**
  * A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
@@ -48,10 +51,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
 	private static final long serialVersionUID = -4788589118464155835L;
 
-	/** The wrapped Storm {@link IRichBolt bolt} */
+	/** The wrapped Storm {@link IRichBolt bolt}. */
 	private final IRichBolt bolt;
-	/** Number of attributes of the bolt's output tuples */
-	private final int numberOfAttributes;
+	/** Number of attributes of the bolt's output tuples per stream. */
+	private final HashMap<String, Integer> numberOfAttributes;
 	/** The schema (ie, ordered field names) of the input stream. */
 	private final Fields inputSchema;
 
@@ -64,34 +67,34 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 	/**
 	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
 	 * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
-	 * for POJO input types. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's
+	 * for POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's
 	 * declared number of attributes.
 	 * 
 	 * @param bolt
-	 * 		The Storm {@link IRichBolt bolt} to be used.
+	 *            The Storm {@link IRichBolt bolt} to be used.
 	 * @throws IllegalArgumentException
-	 * 		If the number of declared output attributes is not with range [1;25].
+	 *             If the number of declared output attributes is not with range [0;25].
 	 */
 	public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
-		this(bolt, null, false);
+		this(bolt, null, (Collection<String>) null);
 	}
 
 	/**
 	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
 	 * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
-	 * {@link Tuple1} to {@link Tuple25}. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on
+	 * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on
 	 * the bolt's declared number of attributes.
 	 * 
 	 * @param bolt
-	 * 		The Storm {@link IRichBolt bolt} to be used.
+	 *            The Storm {@link IRichBolt bolt} to be used.
 	 * @param inputSchema
-	 * 		The schema (ie, ordered field names) of the input stream.
+	 *            The schema (ie, ordered field names) of the input stream.
 	 * @throws IllegalArgumentException
-	 * 		If the number of declared output attributes is not with range [1;25].
+	 *             If the number of declared output attributes is not with range [0;25].
 	 */
 	public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema)
 			throws IllegalArgumentException {
-		this(bolt, inputSchema, false);
+		this(bolt, inputSchema, (Collection<String>) null);
 	}
 
 	/**
@@ -99,47 +102,93 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 	 * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
 	 * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
 	 * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
-	 * of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
 	 * 
 	 * @param bolt
-	 * 		The Storm {@link IRichBolt bolt} to be used.
-	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type
-	 * 		{@link Tuple1} but be of a raw type.
+	 *            The Storm {@link IRichBolt bolt} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is
-	 * 		not 1 or if {@code rawOuput} is {@code false} and the number of declared output
-	 * 		attributes is not with range [1;25].
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [1;25].
 	 */
-	public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput)
+	public StormBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
 			throws IllegalArgumentException {
-		this(bolt, null, rawOutput);
+		this(bolt, null, Sets.newHashSet(rawOutputs));
+	}
+
+	/**
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+	 * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
+	 * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
+	 * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
+	 * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * 
+	 * @param bolt
+	 *            The Storm {@link IRichBolt bolt} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
+	 * @throws IllegalArgumentException
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [1;25].
+	 */
+	public StormBoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
+			throws IllegalArgumentException {
+		this(bolt, null, rawOutputs);
 	}
 
 	/**
 	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
 	 * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
-	 * {@link Tuple1} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
+	 * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
 	 * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
-	 * be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
 	 * 
 	 * @param bolt
-	 * 		The Storm {@link IRichBolt bolt} to be used.
+	 *            The Storm {@link IRichBolt bolt} to be used.
 	 * @param inputSchema
-	 * 		The schema (ie, ordered field names) of the input stream.
-	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type
-	 * 		{@link Tuple1} but be of a raw type.
+	 *            The schema (ie, ordered field names) of the input stream.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is
-	 * 		not 1 or if {@code rawOuput} is {@code false} and the number of declared output
-	 * 		attributes is not with range [1;25].
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
 	 */
-	public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema, final boolean rawOutput)
-			throws IllegalArgumentException {
+	public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
+			final String[] rawOutputs) throws IllegalArgumentException {
+		this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
+	}
+
+	/**
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+	 * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
+	 * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
+	 * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
+	 * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
+	 * 
+	 * @param bolt
+	 *            The Storm {@link IRichBolt bolt} to be used.
+	 * @param inputSchema
+	 *            The schema (ie, ordered field names) of the input stream.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
+	 * @throws IllegalArgumentException
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
+	 */
+	public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
+			final Collection<String> rawOutputs) throws IllegalArgumentException {
 		this.bolt = bolt;
 		this.inputSchema = inputSchema;
-		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput);
+		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
 	}
 
 	@Override
@@ -151,7 +200,7 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 		flinkCollector = new TimestampedCollector<OUT>(output);
 		OutputCollector stormCollector = null;
 
-		if (this.numberOfAttributes != -1) {
+		if (this.numberOfAttributes.size() > 0) {
 			stormCollector = new OutputCollector(new StormBoltCollector<OUT>(
 					this.numberOfAttributes, flinkCollector));
 		}
@@ -165,10 +214,17 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 		this.bolt.cleanup();
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public void processElement(final StreamRecord<IN> element) throws Exception {
 		flinkCollector.setTimestamp(element.getTimestamp());
-		this.bolt.execute(new StormTuple<IN>(element.getValue(), inputSchema));
+		IN value = element.getValue();
+		if (value instanceof SplitStreamType) {
+			this.bolt.execute(new StormTuple<IN>(((SplitStreamType<IN>) value).value,
+					inputSchema));
+		} else {
+			this.bolt.execute(new StormTuple<IN>(value, inputSchema));
+		}
 	}
 
 	@Override


[2/4] flink git commit: [FLINK-2306] Add support for named streams in Storm compatibility layer - enabled .declareStream() and connect via stream name - enabled multiplt output streams - added .split() / .select() / strip pattern - added helpers in n

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
index 44b3f68..45eb56c 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
@@ -17,17 +17,22 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
+import java.util.Collection;
+
 import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 
+import com.google.common.collect.Sets;
+
 /**
- * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls
- * {@link IRichSpout#nextTuple() nextTuple()} for finite number of times before
- * {@link #run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)}
- * returns. The number of {@code nextTuple()} calls can be specified as a certain number of
- * invocations or can be undefined. In the undefined case, the {@code run(...)} method return if no
- * record was emitted to the output collector for the first time.
+ * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls {@link IRichSpout#nextTuple()
+ * nextTuple()} for finite number of times before
+ * {@link #run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)} returns. The number of
+ * {@code nextTuple()} calls can be specified as a certain number of invocations or can be undefined. In the undefined
+ * case, the {@code run(...)} method return if no record was emitted to the output collector for the first time.
  */
 public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
 	private static final long serialVersionUID = 3883246587044801286L;
@@ -38,79 +43,126 @@ public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT>
 	/**
 	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
 	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
-	 * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
+	 * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
 	 * attributes.
-	 *
+	 * 
 	 * @param spout
-	 * 		The Storm {@link IRichSpout spout} to be used.
+	 *            The Storm {@link IRichSpout spout} to be used.
 	 * @throws IllegalArgumentException
-	 * 		If the number of declared output attributes is not with range [1;25].
+	 *             If the number of declared output attributes is not with range [0;25].
 	 */
 	public StormFiniteSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
-		this(spout, false, -1);
+		this(spout, (Collection<String>) null, -1);
 	}
 
 	/**
 	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
-	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be
-	 * one
-	 * of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of attributes.
-	 *
+	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be one
+	 * of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of attributes.
+	 * 
 	 * @param spout
-	 * 		The Storm {@link IRichSpout spout} to be used.
+	 *            The Storm {@link IRichSpout spout} to be used.
 	 * @param numberOfInvocations
-	 * 		The number of calls to {@link IRichSpout#nextTuple()}.
+	 *            The number of calls to {@link IRichSpout#nextTuple()}.
 	 * @throws IllegalArgumentException
-	 * 		If the number of declared output attributes is not with range [1;25].
+	 *             If the number of declared output attributes is not with range [0;25].
 	 */
 	public StormFiniteSpoutWrapper(final IRichSpout spout, final int numberOfInvocations)
 			throws IllegalArgumentException {
-		this(spout, false, numberOfInvocations);
+		this(spout, (Collection<String>) null, numberOfInvocations);
+	}
+
+	/**
+	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
+	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
+	 * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
+	 * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to
+	 * {@link Tuple25} depending on the spout's declared number of attributes.
+	 * 
+	 * @param spout
+	 *            The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
+	 * @throws IllegalArgumentException
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
+	 */
+	public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
+			throws IllegalArgumentException {
+		this(spout, Sets.newHashSet(rawOutputs), -1);
 	}
 
 	/**
 	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
 	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
 	 * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
-	 * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
+	 * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to
 	 * {@link Tuple25} depending on the spout's declared number of attributes.
-	 *
+	 * 
 	 * @param spout
-	 * 		The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 * 		of a raw type.
+	 *            The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 * 		[1;25].
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
 	 */
-	public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
-		this(spout, rawOutput, -1);
+	public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs)
+			throws IllegalArgumentException {
+		this(spout, rawOutputs, -1);
 	}
 
 	/**
 	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
 	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
 	 * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
-	 * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on
+	 * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on
 	 * the spout's declared number of attributes.
-	 *
+	 * 
 	 * @param spout
-	 * 		The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 * 		of a raw type.
+	 *            The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
 	 * @param numberOfInvocations
-	 * 		The number of calls to {@link IRichSpout#nextTuple()}.
+	 *            The number of calls to {@link IRichSpout#nextTuple()}.
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 * 		[1;25].
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
 	 */
-	public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput, final int numberOfInvocations)
-			throws IllegalArgumentException {
-		super(spout, rawOutput);
+	public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
+			final int numberOfInvocations) throws IllegalArgumentException {
+		super(spout, Sets.newHashSet(rawOutputs));
+		this.numberOfInvocations = numberOfInvocations;
+	}
+
+	/**
+	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
+	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
+	 * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
+	 * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on
+	 * the spout's declared number of attributes.
+	 * 
+	 * @param spout
+	 *            The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type.
+	 * @param numberOfInvocations
+	 *            The number of calls to {@link IRichSpout#nextTuple()}.
+	 * @throws IllegalArgumentException
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
+	 */
+	public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs,
+			final int numberOfInvocations) throws IllegalArgumentException {
+		super(spout, rawOutputs);
 		this.numberOfInvocations = numberOfInvocations;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
index 231cab6..f33d4d3 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
@@ -17,18 +17,21 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
+import java.util.HashMap;
+
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 
 /**
- * {@link StormOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the
- * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)} method.
+ * {@link StormOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the output streams and
+ * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)}
+ * method.
  */
 class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
 
-	/** The output schema declared by the wrapped bolt. */
-	private Fields outputSchema = null;
+	/** The number of attributes for each declared stream by the wrapped operator. */
+	HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
 
 	@Override
 	public void declare(final Fields fields) {
@@ -47,28 +50,14 @@ class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
 
 	@Override
 	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
-		if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
-			throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
+		if (streamId == null) {
+			throw new IllegalArgumentException("Stream ID cannot be null.");
 		}
 		if (direct) {
 			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
 		}
 
-		this.outputSchema = fields;
-	}
-
-	/**
-	 * Returns the number of attributes of the output schema declare by the wrapped bolt. If no output schema is
-	 * declared (eg, for sink bolts), {@code -1} is returned.
-	 *
-	 * @return the number of attributes of the output schema declare by the wrapped bolt
-	 */
-	public int getNumberOfAttributes() {
-		if (this.outputSchema != null) {
-			return this.outputSchema.size();
-		}
-
-		return -1;
+		this.outputSchemas.put(streamId, fields.size());
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
index 09a7ac7..5a20056 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
@@ -19,9 +19,11 @@ package org.apache.flink.stormcompatibility.wrappers;
 
 import backtype.storm.spout.ISpoutOutputCollector;
 
-import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+
+import java.util.HashMap;
 import java.util.List;
 
 /**
@@ -31,23 +33,23 @@ import java.util.List;
  */
 class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {
 
-	// The Flink source context object
+	/** The Flink source context object */
 	private final SourceContext<OUT> flinkContext;
 
 	/**
-	 * Instantiates a new {@link StormSpoutCollector} that emits Flink tuples to the given Flink
-	 * source context. If the number of attributes is specified as zero, any output type is
-	 * supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
-	 * to {@link Tuple25}.
+	 * Instantiates a new {@link StormSpoutCollector} that emits Flink tuples to the given Flink source context. If the
+	 * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0
+	 * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
 	 * 
 	 * @param numberOfAttributes
-	 *        The number of attributes of the emitted tuples.
+	 *            The number of attributes of the emitted tuples.
 	 * @param flinkContext
-	 *        The Flink source context to be used.
+	 *            The Flink source context to be used.
 	 * @throws UnsupportedOperationException
-	 *         if the specified number of attributes is not in the valid range of [0,25]
+	 *             if the specified number of attributes is greater than 25
 	 */
-	public StormSpoutCollector(final int numberOfAttributes, final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
+	public StormSpoutCollector(final HashMap<String, Integer> numberOfAttributes,
+			final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
 		super(numberOfAttributes);
 		assert (flinkContext != null);
 		this.flinkContext = flinkContext;
@@ -68,7 +70,7 @@ class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements IS
 
 	@Override
 	public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
-		return this.transformAndEmit(tuple);
+		return this.tansformAndEmit(streamId, tuple);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
index ab9a890..300b241 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
@@ -17,10 +17,16 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
+import java.util.Collection;
+
 import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 
+import com.google.common.collect.Sets;
+
 /**
  * A {@link StormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped spout's
  * {@link IRichSpout#nextTuple() nextTuple()} method in in infinite loop.
@@ -29,39 +35,61 @@ public class StormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
 	private static final long serialVersionUID = -218340336648247605L;
 
 	/**
-	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
-	 * can
-	 * be used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25}
+	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+	 * be used within a Flink streaming program. The output type will be one of {@link Tuple0} to {@link Tuple25}
 	 * depending on the spout's declared number of attributes.
-	 *
+	 * 
 	 * @param spout
-	 * 		The Storm {@link IRichSpout spout} to be used.
+	 *            The Storm {@link IRichSpout spout} to be used.
 	 * @throws IllegalArgumentException
-	 * 		If the number of declared output attributes is not with range [1;25].
+	 *             If the number of declared output attributes is not with range [0;25].
 	 */
 	public StormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
-		super(spout, false);
+		super(spout, null);
+	}
+
+	/**
+	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+	 * be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
+	 * {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
+	 * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
+	 * attributes.
+	 * 
+	 * @param spout
+	 *            The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type. (Can be {@code null}.)
+	 * @throws IllegalArgumentException
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
+	 */
+	public StormSpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
+			throws IllegalArgumentException {
+		super(spout, Sets.newHashSet(rawOutputs));
 	}
 
 	/**
-	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
-	 * can be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
+	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+	 * be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
 	 * {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
-	 * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
+	 * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
 	 * attributes.
-	 *
+	 * 
 	 * @param spout
-	 * 		The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 * 		of a raw type.
+	 *            The Storm {@link IRichSpout spout} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 *            of a raw type. (Can be {@code null}.)
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 * 		[1;25].
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
 	 */
-	public StormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
-		super(spout, rawOutput);
+	public StormSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs)
+			throws IllegalArgumentException {
+		super(spout, rawOutputs);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
index e2e303a..75ab8e0 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
@@ -25,11 +25,14 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IComponent;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
+
 import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} or
@@ -38,52 +41,47 @@ import java.util.Map;
 class StormWrapperSetupHelper {
 
 	/**
-	 * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or
-	 * {@link StormBoltWrapper}. Returns zero for raw output type or a value within range [1;25] for
-	 * output type {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} to
-	 * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25} . In case of a data sink, {@code -1}
-	 * is returned.
+	 * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link StormBoltWrapper}
+	 * per declared output stream. The number is {@code -1} for raw output type or a value within range [0;25] for
+	 * output type {@link org.apache.flink.api.java.tuple.Tuple0 Tuple0} to
+	 * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25}.
 	 * 
 	 * @param spoutOrBolt
-	 * 		The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
-	 * @param rawOutput
-	 * 		Set to {@code true} if a single attribute output stream, should not be of type
-	 * 		{@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} but be of a raw type.
-	 * @return The number of attributes to be used.
+	 *            The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
+	 * @param rawOutputs
+	 *            Contains stream names if a single attribute output stream, should not be of type
+	 *            {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} but be of a raw type. (Can be {@code null}.)
+	 * @return The number of attributes to be used for each stream.
 	 * @throws IllegalArgumentException
-	 * 		If {@code rawOuput} is {@code true} and the number of declared output
-	 * 		attributes is not 1 or if {@code rawOuput} is {@code false} and the number
-	 * 		of declared output attributes is not with range [1;25].
+	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 *             [0;25].
 	 */
-	public static int getNumberOfAttributes(final IComponent spoutOrBolt, final boolean rawOutput)
-			throws IllegalArgumentException {
+	public static HashMap<String, Integer> getNumberOfAttributes(final IComponent spoutOrBolt,
+			final Collection<String> rawOutputs)
+					throws IllegalArgumentException {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		spoutOrBolt.declareOutputFields(declarer);
 
-		final int declaredNumberOfAttributes = declarer.getNumberOfAttributes();
-
-		if (declaredNumberOfAttributes == -1) {
-			return -1;
-		}
-
-		if ((declaredNumberOfAttributes < 1) || (declaredNumberOfAttributes > 25)) {
-			throw new IllegalArgumentException(
-					"Provided bolt declares non supported number of output attributes. Must be in range [1;25] but " +
-							"was "
-							+ declaredNumberOfAttributes);
-		}
-
-		if (rawOutput) {
-			if (declaredNumberOfAttributes > 1) {
+		for (Entry<String, Integer> schema : declarer.outputSchemas.entrySet()) {
+			int declaredNumberOfAttributes = schema.getValue();
+			if ((declaredNumberOfAttributes < 0) || (declaredNumberOfAttributes > 25)) {
 				throw new IllegalArgumentException(
-						"Ouput type is requested to be raw type, but provided bolt declares more then one output " +
-						"attribute.");
+						"Provided bolt declares non supported number of output attributes. Must be in range [0;25] but "
+								+ "was " + declaredNumberOfAttributes);
+			}
 
+			if (rawOutputs != null && rawOutputs.contains(schema.getKey())) {
+				if (declaredNumberOfAttributes != 1) {
+					throw new IllegalArgumentException(
+							"Ouput type is requested to be raw type, but provided bolt declares more then one output "
+									+ "attribute.");
+				}
+				schema.setValue(-1);
 			}
-			return 0;
 		}
 
-		return declaredNumberOfAttributes;
+		return declarer.outputSchemas;
 	}
 
 	// TODO

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
index 58d81f9..08ac60b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.stormcompatibility.api;
 
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -31,84 +32,111 @@ public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
 
 
 	@Test
+	public void testNull() {
+		Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null));
+	}
+
+	@Test
 	public void testDeclare() {
-		for (int i = 0; i < 4; ++i) {
-			for (int j = 0; j <= 25; ++j) {
-				this.runDeclareTest(i, j);
+		for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
+			for (int j = 1; j < 2; ++j) { // number of streams
+				for (int k = 0; k <= 25; ++k) { // number of attributes
+					this.runDeclareTest(i, j, k);
+				}
 			}
 		}
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareSimpleToManyAttributes() {
-		this.runDeclareTest(0, 26);
+		this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareNonDirectToManyAttributes() {
-		this.runDeclareTest(1, 26);
+		this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareDefaultStreamToManyAttributes() {
-		this.runDeclareTest(2, 26);
+		this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26);
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareFullToManyAttributes() {
-		this.runDeclareTest(3, 26);
+		this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26);
 	}
 
-	private void runDeclareTest(final int testCase, final int numberOfAttributes) {
+	private void runDeclareTest(final int testCase, final int numberOfStreams,
+			final int numberOfAttributes) {
 		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
 
+		String[] streams = null;
+		if (numberOfStreams > 1 || r.nextBoolean()) {
+			streams = new String[numberOfStreams];
+			for (int i = 0; i < numberOfStreams; ++i) {
+				streams[i] = "stream" + i;
+			}
+		}
+
 		final String[] attributes = new String[numberOfAttributes];
-		for (int i = 0; i < numberOfAttributes; ++i) {
+		for (int i = 0; i < attributes.length; ++i) {
 			attributes[i] = "a" + i;
 		}
 
 		switch (testCase) {
-			case 0:
-				this.declareSimple(declarer, attributes);
-				break;
-			case 1:
-				this.declareNonDirect(declarer, attributes);
-				break;
-			case 2:
-				this.declareDefaultStream(declarer, attributes);
-				break;
-			default:
-				this.declareFull(declarer, attributes);
+		case 0:
+			this.declareSimple(declarer, streams, attributes);
+			break;
+		default:
+			this.declareNonDirect(declarer, streams, attributes);
+		}
+
+		if (streams == null) {
+			streams = new String[] { Utils.DEFAULT_STREAM_ID };
 		}
 
-		final TypeInformation<?> type = declarer.getOutputType();
+		for (String stream : streams) {
+			final TypeInformation<?> type = declarer.getOutputType(stream);
 
-		if (numberOfAttributes == 0) {
-			Assert.assertNull(type);
-		} else {
-			Assert.assertEquals(numberOfAttributes, type.getArity());
 			if (numberOfAttributes == 1) {
-				Assert.assertFalse(type.isTupleType());
+				Assert.assertEquals(type.getClass(), GenericTypeInfo.class);
+				Assert.assertEquals(type.getTypeClass(), Object.class);
 			} else {
+				Assert.assertEquals(numberOfAttributes, type.getArity());
 				Assert.assertTrue(type.isTupleType());
 			}
 		}
 	}
 
-	private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
-		declarer.declare(new Fields(attributes));
-	}
+	private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
+			final String[] attributes) {
 
-	private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
-		declarer.declare(false, new Fields(attributes));
+		if (streams != null) {
+			for (String stream : streams) {
+				declarer.declareStream(stream, new Fields(attributes));
+			}
+		} else {
+			declarer.declare(new Fields(attributes));
+		}
 	}
 
-	private void declareDefaultStream(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
-		declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields(attributes));
+	private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
+			final String[] attributes) {
+
+		if (streams != null) {
+			for (String stream : streams) {
+				declarer.declareStream(stream, false, new Fields(attributes));
+			}
+		} else {
+			declarer.declare(false, new Fields(attributes));
+		}
 	}
 
-	private void declareFull(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
-		declarer.declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields(attributes));
+	@Test(expected = IllegalArgumentException.class)
+	public void testUndeclared() {
+		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+		declarer.getOutputType("unknownStreamId");
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
@@ -117,20 +145,10 @@ public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareNonDefaultStrem() {
-		new FlinkOutputFieldsDeclarer().declareStream("dummy", null);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareDirect2() {
 		new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareNonDefaultStrem2() {
-		new FlinkOutputFieldsDeclarer().declareStream("dummy", this.r.nextBoolean(), null);
-	}
-
 	@Test
 	public void testGetGroupingFieldIndexes() {
 		final int numberOfAttributes = 5 + this.r.nextInt(21);
@@ -163,7 +181,8 @@ public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
 			}
 		}
 
-		final int[] result = declarer.getGroupingFieldIndexes(groupingFields);
+		final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
+				groupingFields);
 
 		Assert.assertEquals(expectedResult.length, result.length);
 		for (int i = 0; i < expectedResult.length; ++i) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
new file mode 100644
index 0000000..0187020
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import org.junit.Test;
+
+public class FlinkTopologyBuilderTest {
+
+	@Test(expected = RuntimeException.class)
+	public void testUnknowSpout() {
+		FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+		builder.setSpout("spout", new TestSpout());
+		builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown");
+		builder.createTopology();
+	}
+
+	@Test(expected = RuntimeException.class)
+	public void testUnknowBolt() {
+		FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+		builder.setSpout("spout", new TestSpout());
+		builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout");
+		builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown");
+		builder.createTopology();
+	}
+
+	@Test(expected = RuntimeException.class)
+	public void testUndeclaredStream() {
+		FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+		builder.setSpout("spout", new TestSpout());
+		builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout");
+		builder.createTopology();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java
new file mode 100644
index 0000000..2e4a534
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+public class TestBolt implements IRichBolt {
+	private static final long serialVersionUID = -667148827441397683L;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+
+	@Override
+	public void execute(Tuple input) {}
+
+	@Override
+	public void cleanup() {}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java
new file mode 100644
index 0000000..146218f
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+public class TestSpout implements IRichSpout {
+	private static final long serialVersionUID = -4884029383198924007L;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
+
+	@Override
+	public void close() {}
+
+	@Override
+	public void activate() {}
+
+	@Override
+	public void deactivate() {}
+
+	@Override
+	public void nextTuple() {}
+
+	@Override
+	public void ack(Object msgId) {}
+
+	@Override
+	public void fail(Object msgId) {}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
new file mode 100644
index 0000000..c0a6ed3
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wrappers;
+
+import java.util.Iterator;
+
+import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlinkStormStreamSelectorTest {
+
+	@Test
+	public void testSelector() {
+		FlinkStormStreamSelector<Object> selector = new FlinkStormStreamSelector<Object>();
+		SplitStreamType<Object> tuple = new SplitStreamType<Object>();
+		Iterator<String> result;
+
+		tuple.streamId = "stream1";
+		result = selector.select(tuple).iterator();
+		Assert.assertEquals("stream1", result.next());
+		Assert.assertFalse(result.hasNext());
+
+		tuple.streamId = "stream2";
+		result = selector.select(tuple).iterator();
+		Assert.assertEquals("stream2", result.next());
+		Assert.assertFalse(result.hasNext());
+
+		tuple.streamId = "stream1";
+		result = selector.select(tuple).iterator();
+		Assert.assertEquals("stream1", result.next());
+		Assert.assertFalse(result.hasNext());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
index 3997505..d01c3e0 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 
 import static org.mockito.Mockito.mock;
@@ -36,19 +37,23 @@ public class StormBoltCollectorTest extends AbstractTest {
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
-		for (int numberOfAttributes = 0; numberOfAttributes < 26; ++numberOfAttributes) {
+		for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
 			final Output flinkCollector = mock(Output.class);
 			Tuple flinkTuple = null;
 			final Values tuple = new Values();
 
 			StormBoltCollector<?> collector;
 
-			if (numberOfAttributes == 0) {
-				collector = new StormBoltCollector(numberOfAttributes, flinkCollector);
+			final String streamId = "streamId";
+			HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+			attributes.put(streamId, numberOfAttributes);
+
+			if (numberOfAttributes == -1) {
+				collector = new StormBoltCollector(attributes, flinkCollector);
 				tuple.add(new Integer(this.r.nextInt()));
 
 			} else {
-				collector = new StormBoltCollector(numberOfAttributes, flinkCollector);
+				collector = new StormBoltCollector(attributes, flinkCollector);
 				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
 
 				for (int i = 0; i < numberOfAttributes; ++i) {
@@ -57,14 +62,13 @@ public class StormBoltCollectorTest extends AbstractTest {
 				}
 			}
 
-			final String streamId = "streamId";
 			final Collection anchors = mock(Collection.class);
 			final List<Integer> taskIds;
 			taskIds = collector.emit(streamId, anchors, tuple);
 
 			Assert.assertNull(taskIds);
 
-			if (numberOfAttributes == 0) {
+			if (numberOfAttributes == -1) {
 				verify(flinkCollector).collect(tuple.get(0));
 			} else {
 				verify(flinkCollector).collect(flinkTuple);
@@ -76,26 +80,26 @@ public class StormBoltCollectorTest extends AbstractTest {
 	@SuppressWarnings("unchecked")
 	@Test(expected = UnsupportedOperationException.class)
 	public void testReportError() {
-		new StormBoltCollector<Object>(1, mock(Output.class)).reportError(null);
+		new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).reportError(null);
 	}
 
-	@SuppressWarnings({"rawtypes", "unchecked"})
+	@SuppressWarnings("unchecked")
 	@Test(expected = UnsupportedOperationException.class)
 	public void testEmitDirect() {
-		new StormBoltCollector<Object>(1, mock(Output.class)).emitDirect(0, null,
+		new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).emitDirect(0, null,
 				null, null);
 	}
 
 	@SuppressWarnings("unchecked")
 	@Test(expected = UnsupportedOperationException.class)
 	public void testAck() {
-		new StormBoltCollector<Object>(1, mock(Output.class)).ack(null);
+		new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).ack(null);
 	}
 
 	@SuppressWarnings("unchecked")
 	@Test(expected = UnsupportedOperationException.class)
 	public void testFail() {
-		new StormBoltCollector<Object>(1, mock(Output.class)).fail(null);
+		new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).fail(null);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index 3e55d23..2491486 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -20,10 +20,16 @@ package org.apache.flink.stormcompatibility.wrappers;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
 
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -34,18 +40,17 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.HashSet;
 import java.util.Map;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
-public class StormBoltWrapperTest {
+public class StormBoltWrapperTest extends AbstractTest {
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperRawType() throws Exception {
@@ -53,7 +58,8 @@ public class StormBoltWrapperTest {
 		declarer.declare(new Fields("dummy1", "dummy2"));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
-		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
+		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class),
+				new String[] { Utils.DEFAULT_STREAM_ID });
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -79,38 +85,40 @@ public class StormBoltWrapperTest {
 		declarer.declare(new Fields(schema));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
-		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), false);
+		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {});
 	}
 
 	@Test
 	public void testWrapper() throws Exception {
-		for (int i = 0; i < 26; ++i) {
+		for (int i = -1; i < 26; ++i) {
 			this.testWrapper(i);
 		}
 	}
 
 	@SuppressWarnings({"rawtypes", "unchecked"})
 	private void testWrapper(final int numberOfAttributes) throws Exception {
-		assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
+		assert ((-1 <= numberOfAttributes) && (numberOfAttributes <= 25));
 		Tuple flinkTuple = null;
 		String rawTuple = null;
 
-		if (numberOfAttributes == 0) {
+		if (numberOfAttributes == -1) {
 			rawTuple = "test";
 		} else {
 			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
 		}
 
-		String[] schema = new String[numberOfAttributes];
-		if (numberOfAttributes == 0) {
+		String[] schema;
+		if (numberOfAttributes == -1) {
 			schema = new String[1];
+		} else {
+			schema = new String[numberOfAttributes];
 		}
 		for (int i = 0; i < schema.length; ++i) {
 			schema[i] = "a" + i;
 		}
 
 		final StreamRecord record = mock(StreamRecord.class);
-		if (numberOfAttributes == 0) {
+		if (numberOfAttributes == -1) {
 			when(record.getValue()).thenReturn(rawTuple);
 		} else {
 			when(record.getValue()).thenReturn(flinkTuple);
@@ -124,17 +132,63 @@ public class StormBoltWrapperTest {
 		declarer.declare(new Fields(schema));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
-		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, null);
+		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null);
 		wrapper.setup(mock(Output.class), taskContext);
 		wrapper.open(new Configuration());
 
 		wrapper.processElement(record);
-		if (numberOfAttributes == 0) {
+		if (numberOfAttributes == -1) {
 			verify(bolt).execute(eq(new StormTuple<String>(rawTuple, null)));
 		} else {
 			verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple, null)));
 		}
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testMultipleOutputStreams() throws Exception {
+		final boolean rawOutType1 = super.r.nextBoolean();
+		final boolean rawOutType2 = super.r.nextBoolean();
+
+		final StreamRecord record = mock(StreamRecord.class);
+		when(record.getValue()).thenReturn(2).thenReturn(3);
+
+		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
+		Output output = mock(Output.class);
+
+		TestBolt bolt = new TestBolt();
+		HashSet<String> raw = new HashSet<String>();
+		if (rawOutType1) {
+			raw.add("stream1");
+		}
+		if (rawOutType2) {
+			raw.add("stream2");
+		}
+
+		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null, raw);
+		wrapper.setup(output, taskContext);
+		wrapper.open(new Configuration());
+
+		SplitStreamType splitRecord = new SplitStreamType<Integer>();
+		if (rawOutType1) {
+			splitRecord.streamId = "stream1";
+			splitRecord.value = 2;
+		} else {
+			splitRecord.streamId = "stream1";
+			splitRecord.value = new Tuple1<Integer>(2);
+		}
+		wrapper.processElement(record);
+		verify(output).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
 
+		if (rawOutType2) {
+			splitRecord.streamId = "stream2";
+			splitRecord.value = 3;
+		} else {
+			splitRecord.streamId = "stream2";
+			splitRecord.value = new Tuple1<Integer>(3);
+		}
+		wrapper.processElement(record);
+		verify(output, times(2)).collect(new StreamRecord<SplitStreamType>(splitRecord, 0));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -185,4 +239,40 @@ public class StormBoltWrapperTest {
 		verify(bolt).cleanup();
 	}
 
+	private static final class TestBolt implements IRichBolt {
+		private static final long serialVersionUID = 7278692872260138758L;
+		private OutputCollector collector;
+
+		@SuppressWarnings("rawtypes")
+		@Override
+		public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+			this.collector = collector;
+		}
+
+		int counter = 0;
+		@Override
+		public void execute(backtype.storm.tuple.Tuple input) {
+			if (++counter % 2 == 1) {
+				this.collector.emit("stream1", new Values(input.getInteger(0)));
+			} else {
+				this.collector.emit("stream2", new Values(input.getInteger(0)));
+			}
+		}
+
+		@Override
+		public void cleanup() {}
+
+		@Override
+		public void declareOutputFields(OutputFieldsDeclarer declarer) {
+			declarer.declareStream("stream1", new Fields("a1"));
+			declarer.declareStream("stream2", new Fields("a2"));
+		}
+
+		@Override
+		public Map<String, Object> getComponentConfiguration() {
+			return null;
+		}
+	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
index cfde770..a28b6e5 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.stormcompatibility.wrappers;
 
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -31,50 +32,60 @@ public class StormOutputFieldsDeclarerTest extends AbstractTest {
 	public void testDeclare() {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 
-		Assert.assertEquals(-1, declarer.getNumberOfAttributes());
+		int numberOfAttributes = this.r.nextInt(26);
+		declarer.declare(createSchema(numberOfAttributes));
+		Assert.assertEquals(1, declarer.outputSchemas.size());
+		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
+				.intValue());
+
+		final String sid = "streamId";
+		numberOfAttributes = 0 + this.r.nextInt(26);
+		declarer.declareStream(sid, createSchema(numberOfAttributes));
+		Assert.assertEquals(2, declarer.outputSchemas.size());
+		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
+	}
 
-		final int numberOfAttributes = 1 + this.r.nextInt(25);
+	private Fields createSchema(final int numberOfAttributes) {
 		final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
 		for (int i = 0; i < numberOfAttributes; ++i) {
 			schema.add("a" + i);
 		}
-		declarer.declare(new Fields(schema));
-		Assert.assertEquals(numberOfAttributes, declarer.getNumberOfAttributes());
+		return new Fields(schema);
 	}
 
 	@Test
 	public void testDeclareDirect() {
-		new StormOutputFieldsDeclarer().declare(false, null);
+		new StormOutputFieldsDeclarer().declare(false, new Fields());
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareDirectFail() {
-		new StormOutputFieldsDeclarer().declare(true, null);
+		new StormOutputFieldsDeclarer().declare(true, new Fields());
 	}
 
 	@Test
 	public void testDeclareStream() {
-		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, null);
+		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields());
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
+	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareStreamFail() {
-		new StormOutputFieldsDeclarer().declareStream(null, null);
+		new StormOutputFieldsDeclarer().declareStream(null, new Fields());
 	}
 
 	@Test
 	public void testDeclareFullStream() {
-		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, null);
+		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields());
 	}
 
-	@Test(expected = UnsupportedOperationException.class)
+	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareFullStreamFailNonDefaultStream() {
-		new StormOutputFieldsDeclarer().declareStream(null, false, null);
+		new StormOutputFieldsDeclarer().declareStream(null, false, new Fields());
 	}
 
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareFullStreamFailDirect() {
-		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
+		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields());
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
index e4826bb..36ed58a 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollectorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.List;
 
 import static org.mockito.Mockito.mock;
@@ -35,19 +36,23 @@ public class StormSpoutCollectorTest extends AbstractTest {
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
-		for (int numberOfAttributes = 0; numberOfAttributes < 26; ++numberOfAttributes) {
+		for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
 			final SourceContext flinkCollector = mock(SourceContext.class);
 			Tuple flinkTuple = null;
 			final Values tuple = new Values();
 
 			StormSpoutCollector<?> collector;
 
-			if (numberOfAttributes == 0) {
-				collector = new StormSpoutCollector(numberOfAttributes, flinkCollector);
+			final String streamId = "streamId";
+			HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+			attributes.put(streamId, numberOfAttributes);
+
+			if (numberOfAttributes == -1) {
+				collector = new StormSpoutCollector(attributes, flinkCollector);
 				tuple.add(new Integer(this.r.nextInt()));
 
 			} else {
-				collector = new StormSpoutCollector(numberOfAttributes, flinkCollector);
+				collector = new StormSpoutCollector(attributes, flinkCollector);
 				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
 
 				for (int i = 0; i < numberOfAttributes; ++i) {
@@ -56,7 +61,6 @@ public class StormSpoutCollectorTest extends AbstractTest {
 				}
 			}
 
-			final String streamId = "streamId";
 			final List<Integer> taskIds;
 			final Object messageId = new Integer(this.r.nextInt());
 
@@ -64,7 +68,7 @@ public class StormSpoutCollectorTest extends AbstractTest {
 
 			Assert.assertNull(taskIds);
 
-			if (numberOfAttributes == 0) {
+			if (numberOfAttributes == -1) {
 				verify(flinkCollector).collect(tuple.get(0));
 			} else {
 				verify(flinkCollector).collect(flinkTuple);
@@ -75,13 +79,15 @@ public class StormSpoutCollectorTest extends AbstractTest {
 	@SuppressWarnings("unchecked")
 	@Test(expected = UnsupportedOperationException.class)
 	public void testReportError() {
-		new StormSpoutCollector<Object>(1, mock(SourceContext.class)).reportError(null);
+		new StormSpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class))
+				.reportError(null);
 	}
 
 	@SuppressWarnings("unchecked")
 	@Test(expected = UnsupportedOperationException.class)
 	public void testEmitDirect() {
-		new StormSpoutCollector<Object>(1, mock(SourceContext.class)).emitDirect(0, null, null,
+		new StormSpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class)).emitDirect(
+				0, null, null,
 				(Object) null);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
index 96e7b35..06d5399 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
@@ -60,7 +60,7 @@ public class StormTupleTest extends AbstractTest {
 
 	@Test
 	public void tupleTest() throws InstantiationException, IllegalAccessException {
-		final int numberOfAttributes = 1 + this.r.nextInt(25);
+		final int numberOfAttributes = this.r.nextInt(26);
 		final Object[] data = new Object[numberOfAttributes];
 
 		final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
index 15129ce..7497ffc 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
@@ -17,10 +17,14 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
+import java.util.HashMap;
+
 import backtype.storm.topology.IComponent;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,29 +33,14 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import com.google.common.collect.Sets;
+
 import static org.mockito.Mockito.mock;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StormWrapperSetupHelper.class)
 public class StormWrapperSetupHelperTest extends AbstractTest {
 
-	@Test(expected = IllegalArgumentException.class)
-	public void testZeroAttributesDeclarerBolt() throws Exception {
-		IComponent boltOrSpout;
-
-		if (this.r.nextBoolean()) {
-			boltOrSpout = mock(IRichSpout.class);
-		} else {
-			boltOrSpout = mock(IRichBolt.class);
-		}
-
-		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
-		declarer.declare(new Fields());
-		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-
-		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, this.r.nextBoolean());
-	}
-
 	@Test
 	public void testEmptyDeclarerBolt() {
 		IComponent boltOrSpout;
@@ -62,7 +51,8 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
 			boltOrSpout = mock(IRichBolt.class);
 		}
 
-		Assert.assertEquals(-1, StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, this.r.nextBoolean()));
+		Assert.assertEquals(new HashMap<String, Integer>(),
+				StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -79,7 +69,8 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
 		declarer.declare(new Fields("dummy1", "dummy2"));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
-		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, true);
+		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
+				Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
@@ -100,20 +91,22 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
 		declarer.declare(new Fields(schema));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
-		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, false);
+		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null);
 	}
 
 	@Test
 	public void testTupleTypes() throws Exception {
-		for (int i = 0; i < 26; ++i) {
+		for (int i = -1; i < 26; ++i) {
 			this.testTupleTypes(i);
 		}
 	}
 
 	private void testTupleTypes(final int numberOfAttributes) throws Exception {
-		String[] schema = new String[numberOfAttributes];
-		if (numberOfAttributes == 0) {
+		String[] schema;
+		if (numberOfAttributes == -1) {
 			schema = new String[1];
+		} else {
+			schema = new String[numberOfAttributes];
 		}
 		for (int i = 0; i < schema.length; ++i) {
 			schema[i] = "a" + i;
@@ -130,7 +123,13 @@ public class StormWrapperSetupHelperTest extends AbstractTest {
 		declarer.declare(new Fields(schema));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
-		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, numberOfAttributes == 0);
+		HashMap<String, Integer> attributes = new HashMap<String, Integer>();
+		attributes.put(Utils.DEFAULT_STREAM_ID, numberOfAttributes);
+
+		Assert.assertEquals(attributes, StormWrapperSetupHelper.getNumberOfAttributes(
+				boltOrSpout,
+				numberOfAttributes == -1 ? Sets
+						.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
index 4621650..430972b 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/pom.xml
@@ -135,6 +135,7 @@ under the License.
 				<executions>
 
 					<!-- WordCount Spout source-->
+					<!-- example for embedded spout - for whole topologies see "WordCount Storm topology" example below -->
 					<execution>
 						<id>WordCount-SpoutSource</id>
 						<phase>package</phase>
@@ -176,6 +177,7 @@ under the License.
 					</execution>
 
 					<!-- WordCount Bolt tokenizer-->
+					<!-- example for embedded bolt - for whole topologies see "WordCount Storm topology" example below -->
 					<execution>
 						<id>WordCount-BoltTokenizer</id>
 						<phase>package</phase>
@@ -222,6 +224,7 @@ under the License.
 			</plugin>
 
 			<!-- WordCount Storm topology-->
+			<!-- example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
 			<!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
 			<plugin>
 				<artifactId>maven-assembly-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
index 7bcb7f9..ee5d9f9 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java
@@ -26,6 +26,8 @@ import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import backtype.storm.utils.Utils;
+
 /**
  * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
  * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
@@ -65,8 +67,9 @@ public class ExclamationWithStormBolt {
 		final DataStream<String> exclaimed = text
 				.transform("StormBoltTokenizer",
 						TypeExtractor.getForObject(""),
-						new StormBoltWrapper<String, String>(new ExclamationBolt(), true))
-				.map(new ExclamationMap());
+				new StormBoltWrapper<String, String>(new ExclamationBolt(),
+						new String[] { Utils.DEFAULT_STREAM_ID }))
+						.map(new ExclamationMap());
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
index f027eae..962a318 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java
@@ -27,6 +27,8 @@ import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import backtype.storm.utils.Utils;
+
 /**
  * Implements the "Exclamation" program that attaches five exclamation mark to every line of a text
  * files in a streaming fashion. The program is constructed as a regular {@link StormTopology}.
@@ -84,6 +86,7 @@ public class ExclamationWithStormSpout {
 	// *************************************************************************
 
 	private static class ExclamationMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = -684993133807698042L;
 
 		@Override
 		public String map(String value) throws Exception {
@@ -126,13 +129,14 @@ public class ExclamationWithStormSpout {
 			final String[] tokens = textPath.split(":");
 			final String localFile = tokens[tokens.length - 1];
 			return env.addSource(
-					new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(localFile), true),
-					TypeExtractor.getForClass(String.class)).setParallelism(1);
+					new FiniteStormSpoutWrapper<String>(new FiniteStormFileSpout(localFile),
+							new String[] { Utils.DEFAULT_STREAM_ID }),
+							TypeExtractor.getForClass(String.class)).setParallelism(1);
 		}
 
 		return env.addSource(
-				new FiniteStormSpoutWrapper<String>(
-						new FiniteStormInMemorySpout(WordCountData.WORDS), true),
+				new FiniteStormSpoutWrapper<String>(new FiniteStormInMemorySpout(
+						WordCountData.WORDS), new String[] { Utils.DEFAULT_STREAM_ID }),
 				TypeExtractor.getForClass(String.class)).setParallelism(1);
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
new file mode 100644
index 0000000..4116f3c
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.stormcompatibility.split.stormoperators.RandomSpout;
+import org.apache.flink.stormcompatibility.split.stormoperators.VerifyAndEnrichBolt;
+import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
+import org.apache.flink.stormcompatibility.util.SplitStreamMapper;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements a simple example with two declared output streams for the embedded Spout.
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>handle multiple output stream of a spout</li>
+ * <li>accessing each stream by .split(...) and .select(...)</li>
+ * <li>strip wrapper data type SplitStreamType for furhter processing in Flink</li>
+ * </ul>
+ * <p/>
+ * This example would work the same way for multiple bolt output streams.
+ */
+public class SpoutSplitExample {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(final String[] args) throws Exception {
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		String[] rawOutputs = new String[] { RandomSpout.EVEN_STREAM, RandomSpout.ODD_STREAM };
+
+		final DataStream<SplitStreamType<Integer>> numbers = env.addSource(
+				new StormSpoutWrapper<SplitStreamType<Integer>>(new RandomSpout(true, 0),
+						rawOutputs), TypeExtractor.getForObject(new SplitStreamType<Integer>()));
+
+		SplitDataStream<SplitStreamType<Integer>> splitStream = numbers
+				.split(new FlinkStormStreamSelector<Integer>());
+
+		DataStream<SplitStreamType<Integer>> evenStream = splitStream.select(RandomSpout.EVEN_STREAM);
+		DataStream<SplitStreamType<Integer>> oddStream = splitStream.select(RandomSpout.ODD_STREAM);
+
+		evenStream.map(new SplitStreamMapper<Integer>()).returns(Integer.class).map(new Enrich("even")).print();
+		oddStream.transform("oddBolt",
+				TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
+				new StormBoltWrapper<SplitStreamType<Integer>, Tuple2<String, Integer>>(
+						new VerifyAndEnrichBolt(false)))
+						.print();
+
+		// execute program
+		env.execute("Spout split stream example");
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Same as {@link VerifyAndEnrichBolt}.
+	 */
+	private final static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 5213888269197438892L;
+		private final Tuple2<String, Integer> out;
+
+		public Enrich(String token) {
+			this.out = new Tuple2<String, Integer>(token, 0);
+		}
+
+		@Override
+		public Tuple2<String, Integer> map(Integer value) throws Exception {
+			this.out.setField(value, 1);
+			return this.out;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a830299/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
new file mode 100644
index 0000000..75d710e
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/stormoperators/RandomSpout.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.split.stormoperators;
+
+import java.util.Map;
+import java.util.Random;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class RandomSpout extends BaseRichSpout {
+	private static final long serialVersionUID = -3978554318742509334L;
+
+	public static final String EVEN_STREAM = "even";
+	public static final String ODD_STREAM = "odd";
+
+	private final boolean split;
+	private Random r = new Random();
+	private SpoutOutputCollector collector;
+
+	public RandomSpout(boolean split, long seed) {
+		this.split = split;
+		this.r = new Random(seed);
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+		this.collector = collector;
+	}
+
+	@Override
+	public void nextTuple() {
+		int i = r.nextInt();
+		if (split) {
+			if (i % 2 == 0) {
+				this.collector.emit(EVEN_STREAM, new Values(i));
+			} else {
+				this.collector.emit(ODD_STREAM, new Values(i));
+			}
+		} else {
+			this.collector.emit(new Values(i));
+		}
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		Fields schema = new Fields("number");
+		if (split) {
+			declarer.declareStream(EVEN_STREAM, schema);
+			declarer.declareStream(ODD_STREAM, schema);
+		} else {
+			declarer.declare(schema);
+		}
+	}
+
+}


[4/4] flink git commit: [tests] Move UnionClosedBranchingTest to optimizer tests

Posted by se...@apache.org.
[tests] Move UnionClosedBranchingTest to optimizer tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a82bd431
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a82bd431
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a82bd431

Branch: refs/heads/master
Commit: a82bd4311240c3eca1656a05b149d85734fd6196
Parents: c9cfb17
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 21 17:58:50 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 21 19:35:39 2015 +0200

----------------------------------------------------------------------
 .../dataexchange/UnionClosedBranchingTest.java  | 192 +++++++++++++++++++
 .../flink/test/UnionClosedBranchingTest.java    | 192 -------------------
 2 files changed, 192 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a82bd431/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/UnionClosedBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/UnionClosedBranchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/UnionClosedBranchingTest.java
new file mode 100644
index 0000000..b870a91
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/UnionClosedBranchingTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.runtime.io.network.DataExchangeMode.BATCH;
+import static org.apache.flink.runtime.io.network.DataExchangeMode.PIPELINED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This tests a fix for FLINK-2540.
+ *
+ * <p> This test is necessary, because {@link NAryUnionPlanNode}s are not directly translated
+ * to runtime tasks by the {@link JobGraphGenerator}. Instead, the network stack unions the
+ * inputs by directly reading from multiple inputs (via {@link UnionInputGate}).
+ *
+ * <pre>
+ *   (source)-\        /-\
+ *            (union)-+  (join)
+ *   (source)-/        \-/
+ * </pre>
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-2540">FLINK-2540</a>
+ */
+@RunWith(Parameterized.class)
+@SuppressWarnings({"serial","unchecked"})
+public class UnionClosedBranchingTest extends CompilerTestBase {
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> params() {
+		Collection<Object[]> params = Arrays.asList(new Object[][]{
+				{ExecutionMode.PIPELINED, PIPELINED, BATCH},
+				{ExecutionMode.PIPELINED_FORCED, PIPELINED, PIPELINED},
+				{ExecutionMode.BATCH, BATCH, BATCH},
+				{ExecutionMode.BATCH_FORCED, BATCH, BATCH},
+		});
+
+		// Make sure that changes to ExecutionMode are reflected in this test.
+		assertEquals(ExecutionMode.values().length, params.size());
+
+		return params;
+	}
+
+	private final ExecutionMode executionMode;
+
+	/** Expected {@link DataExchangeMode} from sources to union. */
+	private final DataExchangeMode sourceToUnion;
+
+	/** Expected {@link DataExchangeMode} from union to join. */
+	private final DataExchangeMode unionToJoin;
+
+	public UnionClosedBranchingTest(
+			ExecutionMode executionMode,
+			DataExchangeMode sourceToUnion,
+			DataExchangeMode unionToJoin) {
+
+		this.executionMode = executionMode;
+		this.sourceToUnion = sourceToUnion;
+		this.unionToJoin = unionToJoin;
+	}
+
+	@Test
+	public void testUnionClosedBranchingTest() throws Exception {
+
+		// -----------------------------------------------------------------------------------------
+		// Build test program
+		// -----------------------------------------------------------------------------------------
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setExecutionMode(executionMode);
+		env.setParallelism(4);
+		
+		DataSet<Tuple1<Integer>> src1 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1));
+
+		DataSet<Tuple1<Integer>> src2 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1));
+
+		DataSet<Tuple1<Integer>> union = src1.union(src2);
+
+		DataSet<Tuple2<Integer, Integer>> join = union
+				.join(union).where(0).equalTo(0)
+				.projectFirst(0).projectSecond(0);
+
+		join.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		SinkPlanNode sinkNode = optimizedPlan.getDataSinks().iterator().next();
+
+		DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		// Verify that the compiler correctly sets the expected data exchange modes.
+		for (Channel channel : joinNode.getInputs()) {
+			assertEquals("Unexpected data exchange mode between union and join node.",
+					unionToJoin, channel.getDataExchangeMode());
+		}
+
+		for (SourcePlanNode src : optimizedPlan.getDataSources()) {
+			for (Channel channel : src.getOutgoingChannels()) {
+				assertEquals("Unexpected data exchange mode between source and union node.",
+						sourceToUnion, channel.getDataExchangeMode());
+			}
+		}
+
+		// -----------------------------------------------------------------------------------------
+		// Verify generated JobGraph
+		// -----------------------------------------------------------------------------------------
+
+		JobGraphGenerator jgg = new JobGraphGenerator();
+		JobGraph jobGraph = jgg.compileJobGraph(optimizedPlan);
+
+		List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+
+		// Sanity check for the test setup
+		assertEquals("Unexpected number of vertices created.", 4, vertices.size());
+
+		// Verify all sources
+		JobVertex[] sources = new JobVertex[]{vertices.get(0), vertices.get(1)};
+
+		for (JobVertex src : sources) {
+			// Sanity check
+			assertTrue("Unexpected vertex type. Test setup is broken.", src.isInputVertex());
+
+			// The union is not translated to an extra union task, but the join uses a union
+			// input gate to read multiple inputs. The source create a single result per consumer.
+			assertEquals("Unexpected number of created results.", 2,
+					src.getNumberOfProducedIntermediateDataSets());
+
+			for (IntermediateDataSet dataSet : src.getProducedDataSets()) {
+				ResultPartitionType dsType = dataSet.getResultType();
+
+				// The result type is determined by the channel between the union and the join node
+				// and *not* the channel between source and union.
+				if (unionToJoin.equals(BATCH)) {
+					assertTrue("Expected batch exchange, but result type is " + dsType + ".",
+							dsType.isBlocking());
+				} else {
+					assertFalse("Expected non-batch exchange, but result type is " + dsType + ".",
+							dsType.isBlocking());
+				}
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a82bd431/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java b/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java
deleted file mode 100644
index f7ea911..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.test;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.flink.runtime.io.network.DataExchangeMode.BATCH;
-import static org.apache.flink.runtime.io.network.DataExchangeMode.PIPELINED;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This tests a fix for FLINK-2540.
- *
- * <p> This test is necessary, because {@link NAryUnionPlanNode}s are not directly translated
- * to runtime tasks by the {@link JobGraphGenerator}. Instead, the network stack unions the
- * inputs by directly reading from multiple inputs (via {@link UnionInputGate}).
- *
- * <pre>
- *   (source)-\        /-\
- *            (union)-+  (join)
- *   (source)-/        \-/
- * </pre>
- *
- * @see <a href="https://issues.apache.org/jira/browse/FLINK-2540">FLINK-2540</a>
- */
-@RunWith(Parameterized.class)
-@SuppressWarnings({"serial","unchecked"})
-public class UnionClosedBranchingTest extends CompilerTestBase {
-
-	@Parameterized.Parameters
-	public static Collection<Object[]> params() {
-		Collection<Object[]> params = Arrays.asList(new Object[][]{
-				{ExecutionMode.PIPELINED, PIPELINED, BATCH},
-				{ExecutionMode.PIPELINED_FORCED, PIPELINED, PIPELINED},
-				{ExecutionMode.BATCH, BATCH, BATCH},
-				{ExecutionMode.BATCH_FORCED, BATCH, BATCH},
-		});
-
-		// Make sure that changes to ExecutionMode are reflected in this test.
-		assertEquals(ExecutionMode.values().length, params.size());
-
-		return params;
-	}
-
-	private final ExecutionMode executionMode;
-
-	/** Expected {@link DataExchangeMode} from sources to union. */
-	private final DataExchangeMode sourceToUnion;
-
-	/** Expected {@link DataExchangeMode} from union to join. */
-	private final DataExchangeMode unionToJoin;
-
-	public UnionClosedBranchingTest(
-			ExecutionMode executionMode,
-			DataExchangeMode sourceToUnion,
-			DataExchangeMode unionToJoin) {
-
-		this.executionMode = executionMode;
-		this.sourceToUnion = sourceToUnion;
-		this.unionToJoin = unionToJoin;
-	}
-
-	@Test
-	public void testUnionClosedBranchingTest() throws Exception {
-
-		// -----------------------------------------------------------------------------------------
-		// Build test program
-		// -----------------------------------------------------------------------------------------
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().setExecutionMode(executionMode);
-		env.setParallelism(4);
-		
-		DataSet<Tuple1<Integer>> src1 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1));
-
-		DataSet<Tuple1<Integer>> src2 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1));
-
-		DataSet<Tuple1<Integer>> union = src1.union(src2);
-
-		DataSet<Tuple2<Integer, Integer>> join = union
-				.join(union).where(0).equalTo(0)
-				.projectFirst(0).projectSecond(0);
-
-		join.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
-
-		// -----------------------------------------------------------------------------------------
-		// Verify optimized plan
-		// -----------------------------------------------------------------------------------------
-
-		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
-
-		SinkPlanNode sinkNode = optimizedPlan.getDataSinks().iterator().next();
-
-		DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
-
-		// Verify that the compiler correctly sets the expected data exchange modes.
-		for (Channel channel : joinNode.getInputs()) {
-			assertEquals("Unexpected data exchange mode between union and join node.",
-					unionToJoin, channel.getDataExchangeMode());
-		}
-
-		for (SourcePlanNode src : optimizedPlan.getDataSources()) {
-			for (Channel channel : src.getOutgoingChannels()) {
-				assertEquals("Unexpected data exchange mode between source and union node.",
-						sourceToUnion, channel.getDataExchangeMode());
-			}
-		}
-
-		// -----------------------------------------------------------------------------------------
-		// Verify generated JobGraph
-		// -----------------------------------------------------------------------------------------
-
-		JobGraphGenerator jgg = new JobGraphGenerator();
-		JobGraph jobGraph = jgg.compileJobGraph(optimizedPlan);
-
-		List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-
-		// Sanity check for the test setup
-		assertEquals("Unexpected number of vertices created.", 4, vertices.size());
-
-		// Verify all sources
-		JobVertex[] sources = new JobVertex[]{vertices.get(0), vertices.get(1)};
-
-		for (JobVertex src : sources) {
-			// Sanity check
-			assertTrue("Unexpected vertex type. Test setup is broken.", src.isInputVertex());
-
-			// The union is not translated to an extra union task, but the join uses a union
-			// input gate to read multiple inputs. The source create a single result per consumer.
-			assertEquals("Unexpected number of created results.", 2,
-					src.getNumberOfProducedIntermediateDataSets());
-
-			for (IntermediateDataSet dataSet : src.getProducedDataSets()) {
-				ResultPartitionType dsType = dataSet.getResultType();
-
-				// The result type is determined by the channel between the union and the join node
-				// and *not* the channel between source and union.
-				if (unionToJoin.equals(BATCH)) {
-					assertTrue("Expected batch exchange, but result type is " + dsType + ".",
-							dsType.isBlocking());
-				} else {
-					assertFalse("Expected non-batch exchange, but result type is " + dsType + ".",
-							dsType.isBlocking());
-				}
-			}
-		}
-	}
-
-}