You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/24 15:45:14 UTC

flink git commit: [FLINK-2862] [Storm Compatibility] FlinkTopologyBuilder should use proper generic types

Repository: flink
Updated Branches:
  refs/heads/master 4a3151681 -> fa88d9eb1


[FLINK-2862] [Storm Compatibility] FlinkTopologyBuilder should use proper generic types

This closes #1274


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa88d9eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa88d9eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa88d9eb

Branch: refs/heads/master
Commit: fa88d9eb1b88641d6ca03b38e2058d30971c7a2f
Parents: 4a31516
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Tue Oct 20 18:06:00 2015 +0200
Committer: mjsax <mj...@apache.org>
Committed: Sat Oct 24 13:04:10 2015 +0200

----------------------------------------------------------------------
 docs/apis/storm_compatibility.md                |   8 +-
 .../storm/api/FlinkOutputFieldsDeclarer.java    |   6 +-
 .../flink/storm/api/FlinkTopologyBuilder.java   | 107 +++++++++++--------
 .../storm/util/SplitStreamTypeKeySelector.java  |  47 --------
 .../flink/storm/wrappers/BoltWrapper.java       |  11 +-
 .../api/FlinkOutputFieldsDeclarerTest.java      |  12 +--
 6 files changed, 71 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index bf80d4e..e3e11ab 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -209,7 +209,6 @@ If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., ther
 For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
 Flink provides the predefined output selector `StormStreamSelector<T>` for `.split(...)` already.
 Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
-If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, it is **not** required to strip the wrapper &ndash; `BoltWrapper` removes it automatically.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -221,10 +220,9 @@ DataStream<SplitStreamType<SomeType>> multiStream = ...
 
 SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());
 
-// remove SplitStreamMapper to get data stream of type SomeType
-DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>).returns(SomeType.classs);
-// apply Bolt directly, without stripping SplitStreamType
-DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt for further processing */);
+// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
+DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.classs);
+DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.classs);
 
 // do further processing on s1 and s2
 [...]

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
index 88d2dfe..febd56d 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
@@ -91,7 +91,7 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 	 * @throws IllegalArgumentException
 	 *             If no output schema was declared for the specified stream or if more then 25 attributes got declared.
 	 */
-	TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
+	TypeInformation<Tuple> getOutputType(final String streamId) throws IllegalArgumentException {
 		if (streamId == null) {
 			return null;
 		}
@@ -105,9 +105,7 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 		Tuple t;
 		final int numberOfAttributes = outputSchema.size();
 
-		if (numberOfAttributes == 1) {
-			return TypeExtractor.getForClass(Object.class);
-		} else if (numberOfAttributes <= 25) {
+		if (numberOfAttributes <= 25) {
 			try {
 				t = Tuple.getTupleClass(numberOfAttributes).newInstance();
 			} catch (final InstantiationException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
index 9c41d88..8a88eac 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
@@ -33,9 +33,10 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
 import org.apache.flink.storm.util.SplitStreamType;
-import org.apache.flink.storm.util.SplitStreamTypeKeySelector;
 import org.apache.flink.storm.util.StormStreamSelector;
 import org.apache.flink.storm.wrappers.BoltWrapper;
 import org.apache.flink.storm.wrappers.SpoutWrapper;
@@ -77,14 +78,13 @@ public class FlinkTopologyBuilder {
 	/**
 	 * Creates a Flink program that uses the specified spouts and bolts.
 	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
 	public FlinkTopology createTopology() {
 		this.stormTopology = this.stormBuilder.createTopology();
 
 		final FlinkTopology env = new FlinkTopology();
 		env.setParallelism(1);
 
-		final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
+		final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap<String, HashMap<String, DataStream<Tuple>>>();
 
 		for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
 			final String spoutId = spout.getKey();
@@ -96,24 +96,37 @@ public class FlinkTopologyBuilder {
 			this.outputStreams.put(spoutId, sourceStreams);
 			declarers.put(spoutId, declarer);
 
-			final SpoutWrapper spoutWrapper = new SpoutWrapper(userSpout);
-			spoutWrapper.setStormTopology(stormTopology);
 
-			DataStreamSource source;
-			final HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>();
+			final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
+			final DataStreamSource<?> source;
+
 			if (sourceStreams.size() == 1) {
+				final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout);
+				spoutWrapperSingleOutput.setStormTopology(stormTopology);
+
 				final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
-				source = env.addSource(spoutWrapper, spoutId,
+
+				DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
 						declarer.getOutputType(outputStreamId));
-				outputStreams.put(outputStreamId, source);
+
+				outputStreams.put(outputStreamId, src);
+				source = src;
 			} else {
-				source = env.addSource(spoutWrapper, spoutId,
-						TypeExtractor.getForClass(SplitStreamType.class));
-				SplitStream splitSource = source.split(new StormStreamSelector());
+				final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
+						userSpout);
+				spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
+
+				@SuppressWarnings({ "unchecked", "rawtypes" })
+				DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
+						spoutWrapperMultipleOutputs, spoutId,
+						(TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));
 
+				SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
+						.split(new StormStreamSelector<Tuple>());
 				for (String streamId : sourceStreams.keySet()) {
-					outputStreams.put(streamId, splitSource.select(streamId));
+					outputStreams.put(streamId, splitSource.select(streamId).map(new SplitStreamMapper<Tuple>()));
 				}
+				source = multiSource;
 			}
 			availableInputs.put(spoutId, outputStreams);
 
@@ -171,11 +184,11 @@ public class FlinkTopologyBuilder {
 					final String producerId = stormInputStream.getKey().get_componentId();
 					final String inputStreamId = stormInputStream.getKey().get_streamId();
 
-					final HashMap<String, DataStream> producer = availableInputs.get(producerId);
+					final HashMap<String, DataStream<Tuple>> producer = availableInputs.get(producerId);
 					if (producer != null) {
 						makeProgress = true;
 
-						DataStream inputStream = producer.get(inputStreamId);
+						DataStream<Tuple> inputStream = producer.get(inputStreamId);
 						if (inputStream != null) {
 							final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
 							userBolt.declareOutputFields(declarer);
@@ -193,18 +206,9 @@ public class FlinkTopologyBuilder {
 								final List<String> fields = grouping.get_fields();
 								if (fields.size() > 0) {
 									FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
-									if (producer.size() == 1) {
-										inputStream = inputStream.keyBy(prodDeclarer
-												.getGroupingFieldIndexes(inputStreamId,
-														grouping.get_fields()));
-									} else {
-										inputStream = inputStream
-												.keyBy(new SplitStreamTypeKeySelector(
-														inputStream.getType(),
-														prodDeclarer.getGroupingFieldIndexes(
-																inputStreamId,
-																grouping.get_fields())));
-									}
+									inputStream = inputStream.keyBy(prodDeclarer
+											.getGroupingFieldIndexes(inputStreamId,
+													grouping.get_fields()));
 								} else {
 									inputStream = inputStream.global();
 								}
@@ -215,43 +219,56 @@ public class FlinkTopologyBuilder {
 										"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
 							}
 
-							final SingleOutputStreamOperator outputStream;
-							final BoltWrapper boltWrapper;
+							final SingleOutputStreamOperator<?, ?> outputStream;
+
 							if (boltOutputStreams.size() < 2) { // single output stream or sink
 								String outputStreamId = null;
 								if (boltOutputStreams.size() == 1) {
 									outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
 								}
-								final TypeInformation<?> outType = declarer
+								final TypeInformation<Tuple> outType = declarer
 										.getOutputType(outputStreamId);
 
-								boltWrapper = new BoltWrapper(userBolt, this.outputStreams
-										.get(producerId).get(inputStreamId));
-								outputStream = inputStream.transform(boltId, outType, boltWrapper);
+								final BoltWrapper<Tuple, Tuple> boltWrapperSingleOutput = new BoltWrapper<Tuple, Tuple>(
+										userBolt, this.outputStreams.get(producerId).get(
+												inputStreamId));
+								boltWrapperSingleOutput.setStormTopology(stormTopology);
+
+								final SingleOutputStreamOperator<Tuple, ?> outStream = inputStream
+										.transform(boltId, outType, boltWrapperSingleOutput);
 
 								if (outType != null) {
 									// only for non-sink nodes
-									final HashMap<String, DataStream> op = new HashMap<String, DataStream>();
-									op.put(outputStreamId, outputStream);
+									final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>();
+									op.put(outputStreamId, outStream);
 									availableInputs.put(boltId, op);
 								}
+								outputStream = outStream;
 							} else {
-								final TypeInformation<?> outType = TypeExtractor
+								final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<Tuple, SplitStreamType<Tuple>>(
+										userBolt, this.outputStreams.get(producerId).get(
+												inputStreamId));
+								boltWrapperMultipleOutputs.setStormTopology(stormTopology);
+
+								@SuppressWarnings({ "unchecked", "rawtypes" })
+								final TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) TypeExtractor
 										.getForClass(SplitStreamType.class);
 
-								boltWrapper = new BoltWrapper(userBolt, this.outputStreams.get(producerId).get(inputStreamId));
-								outputStream = inputStream.transform(boltId, outType, boltWrapper);
+								final SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream = inputStream
+										.transform(boltId, outType, boltWrapperMultipleOutputs);
 
-								final SplitStream splitStreams = outputStream
-										.split(new StormStreamSelector());
+								final SplitStream<SplitStreamType<Tuple>> splitStream = multiStream
+										.split(new StormStreamSelector<Tuple>());
 
-								final HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+								final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>();
 								for (String outputStreamId : boltOutputStreams.keySet()) {
-									op.put(outputStreamId, splitStreams.select(outputStreamId));
+									op.put(outputStreamId,
+											splitStream.select(outputStreamId).map(
+													new SplitStreamMapper<Tuple>()));
 								}
 								availableInputs.put(boltId, op);
+								outputStream = multiStream;
 							}
-							boltWrapper.setStormTopology(stormTopology);
 
 							int dop = 1;
 							if (common.is_set_parallelism_hint()) {
@@ -342,7 +359,7 @@ public class FlinkTopologyBuilder {
 	 * 		the basic bolt
 	 * @param parallelism_hint
 	 * 		the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
-	 * 		process somwehere around the cluster.
+	 * 		process somewhere around the cluster.
 	 * @return use the returned object to declare the inputs to this component
 	 */
 	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
@@ -371,7 +388,7 @@ public class FlinkTopologyBuilder {
 	 * 		outputs.
 	 * @param parallelism_hint
 	 * 		the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
-	 * 		process somwehere around the cluster.
+	 * 		process somewhere around the cluster.
 	 * @param spout
 	 * 		the spout
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
deleted file mode 100644
index 71e5b86..0000000
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.storm.util;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
-
-/**
- * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
- * {@link StormStreamSelector} from a Spout or Bolt that declares multiple output streams.
- * 
- * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
- * {@link ArrayKeySelector} on it.
- */
-public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
-	private static final long serialVersionUID = 4672434660037669254L;
-
-	private final ArrayKeySelector<Tuple> selector;
-
-	public SplitStreamTypeKeySelector(TypeInformation<Tuple> type, int... fields) {
-		this.selector = KeySelectorUtil.getSelectorForArray(fields, type);
-	}
-
-	@Override
-	public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
-		return selector.getKey(value.value);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index f0913e8..12d967a 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -61,7 +60,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 	private final Fields inputSchema;
 	/** The original Storm topology. */
 	protected StormTopology stormTopology;
-	
+
 	/**
 	 *  We have to use this because Operators must output
 	 *  {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
@@ -239,17 +238,11 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 		this.bolt.cleanup();
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public void processElement(final StreamRecord<IN> element) throws Exception {
 		this.flinkCollector.setTimestamp(element.getTimestamp());
 		IN value = element.getValue();
-		if (value instanceof SplitStreamType) {
-			this.bolt.execute(new StormTuple<IN>(((SplitStreamType<IN>) value).value,
-					inputSchema));
-		} else {
-			this.bolt.execute(new StormTuple<IN>(value, inputSchema));
-		}
+		this.bolt.execute(new StormTuple<IN>(value, inputSchema));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
index 49de476..8f0ad3b 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
@@ -20,7 +20,6 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
 import org.apache.flink.storm.util.AbstractTest;
 import org.junit.Assert;
@@ -30,8 +29,6 @@ import java.util.LinkedList;
 
 public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
 
-
-
 	@Test
 	public void testNull() {
 		Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null));
@@ -100,13 +97,8 @@ public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
 		for (String stream : streams) {
 			final TypeInformation<?> type = declarer.getOutputType(stream);
 
-			if (numberOfAttributes == 1) {
-				Assert.assertEquals(type.getClass(), GenericTypeInfo.class);
-				Assert.assertEquals(type.getTypeClass(), Object.class);
-			} else {
-				Assert.assertEquals(numberOfAttributes, type.getArity());
-				Assert.assertTrue(type.isTupleType());
-			}
+			Assert.assertEquals(numberOfAttributes, type.getArity());
+			Assert.assertTrue(type.isTupleType());
 		}
 	}