You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/24 15:45:14 UTC
flink git commit: [FLINK-2862] [Storm Compatibility]
FlinkTopologyBuilder should use proper generic types
Repository: flink
Updated Branches:
refs/heads/master 4a3151681 -> fa88d9eb1
[FLINK-2862] [Storm Compatibility] FlinkTopologyBuilder should use proper generic types
This closes #1274
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa88d9eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa88d9eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa88d9eb
Branch: refs/heads/master
Commit: fa88d9eb1b88641d6ca03b38e2058d30971c7a2f
Parents: 4a31516
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Tue Oct 20 18:06:00 2015 +0200
Committer: mjsax <mj...@apache.org>
Committed: Sat Oct 24 13:04:10 2015 +0200
----------------------------------------------------------------------
docs/apis/storm_compatibility.md | 8 +-
.../storm/api/FlinkOutputFieldsDeclarer.java | 6 +-
.../flink/storm/api/FlinkTopologyBuilder.java | 107 +++++++++++--------
.../storm/util/SplitStreamTypeKeySelector.java | 47 --------
.../flink/storm/wrappers/BoltWrapper.java | 11 +-
.../api/FlinkOutputFieldsDeclarerTest.java | 12 +--
6 files changed, 71 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index bf80d4e..e3e11ab 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -209,7 +209,6 @@ If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., ther
For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
Flink provides the predefined output selector `StormStreamSelector<T>` for `.split(...)` already.
Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
-If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, it is **not** required to strip the wrapper – `BoltWrapper` removes it automatically.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -221,10 +220,9 @@ DataStream<SplitStreamType<SomeType>> multiStream = ...
SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());
-// remove SplitStreamMapper to get data stream of type SomeType
-DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>).returns(SomeType.classs);
-// apply Bolt directly, without stripping SplitStreamType
-DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt for further processing */);
+// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
+DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.classs);
+DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.classs);
// do further processing on s1 and s2
[...]
http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
index 88d2dfe..febd56d 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
@@ -91,7 +91,7 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
* @throws IllegalArgumentException
* If no output schema was declared for the specified stream or if more then 25 attributes got declared.
*/
- TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
+ TypeInformation<Tuple> getOutputType(final String streamId) throws IllegalArgumentException {
if (streamId == null) {
return null;
}
@@ -105,9 +105,7 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
Tuple t;
final int numberOfAttributes = outputSchema.size();
- if (numberOfAttributes == 1) {
- return TypeExtractor.getForClass(Object.class);
- } else if (numberOfAttributes <= 25) {
+ if (numberOfAttributes <= 25) {
try {
t = Tuple.getTupleClass(numberOfAttributes).newInstance();
} catch (final InstantiationException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
index 9c41d88..8a88eac 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
@@ -33,9 +33,10 @@ import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
import org.apache.flink.storm.util.SplitStreamType;
-import org.apache.flink.storm.util.SplitStreamTypeKeySelector;
import org.apache.flink.storm.util.StormStreamSelector;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.storm.wrappers.SpoutWrapper;
@@ -77,14 +78,13 @@ public class FlinkTopologyBuilder {
/**
* Creates a Flink program that uses the specified spouts and bolts.
*/
- @SuppressWarnings({"rawtypes", "unchecked"})
public FlinkTopology createTopology() {
this.stormTopology = this.stormBuilder.createTopology();
final FlinkTopology env = new FlinkTopology();
env.setParallelism(1);
- final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
+ final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap<String, HashMap<String, DataStream<Tuple>>>();
for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
final String spoutId = spout.getKey();
@@ -96,24 +96,37 @@ public class FlinkTopologyBuilder {
this.outputStreams.put(spoutId, sourceStreams);
declarers.put(spoutId, declarer);
- final SpoutWrapper spoutWrapper = new SpoutWrapper(userSpout);
- spoutWrapper.setStormTopology(stormTopology);
- DataStreamSource source;
- final HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>();
+ final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
+ final DataStreamSource<?> source;
+
if (sourceStreams.size() == 1) {
+ final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout);
+ spoutWrapperSingleOutput.setStormTopology(stormTopology);
+
final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
- source = env.addSource(spoutWrapper, spoutId,
+
+ DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
declarer.getOutputType(outputStreamId));
- outputStreams.put(outputStreamId, source);
+
+ outputStreams.put(outputStreamId, src);
+ source = src;
} else {
- source = env.addSource(spoutWrapper, spoutId,
- TypeExtractor.getForClass(SplitStreamType.class));
- SplitStream splitSource = source.split(new StormStreamSelector());
+ final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
+ userSpout);
+ spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
+ spoutWrapperMultipleOutputs, spoutId,
+ (TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));
+ SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
+ .split(new StormStreamSelector<Tuple>());
for (String streamId : sourceStreams.keySet()) {
- outputStreams.put(streamId, splitSource.select(streamId));
+ outputStreams.put(streamId, splitSource.select(streamId).map(new SplitStreamMapper<Tuple>()));
}
+ source = multiSource;
}
availableInputs.put(spoutId, outputStreams);
@@ -171,11 +184,11 @@ public class FlinkTopologyBuilder {
final String producerId = stormInputStream.getKey().get_componentId();
final String inputStreamId = stormInputStream.getKey().get_streamId();
- final HashMap<String, DataStream> producer = availableInputs.get(producerId);
+ final HashMap<String, DataStream<Tuple>> producer = availableInputs.get(producerId);
if (producer != null) {
makeProgress = true;
- DataStream inputStream = producer.get(inputStreamId);
+ DataStream<Tuple> inputStream = producer.get(inputStreamId);
if (inputStream != null) {
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userBolt.declareOutputFields(declarer);
@@ -193,18 +206,9 @@ public class FlinkTopologyBuilder {
final List<String> fields = grouping.get_fields();
if (fields.size() > 0) {
FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
- if (producer.size() == 1) {
- inputStream = inputStream.keyBy(prodDeclarer
- .getGroupingFieldIndexes(inputStreamId,
- grouping.get_fields()));
- } else {
- inputStream = inputStream
- .keyBy(new SplitStreamTypeKeySelector(
- inputStream.getType(),
- prodDeclarer.getGroupingFieldIndexes(
- inputStreamId,
- grouping.get_fields())));
- }
+ inputStream = inputStream.keyBy(prodDeclarer
+ .getGroupingFieldIndexes(inputStreamId,
+ grouping.get_fields()));
} else {
inputStream = inputStream.global();
}
@@ -215,43 +219,56 @@ public class FlinkTopologyBuilder {
"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
}
- final SingleOutputStreamOperator outputStream;
- final BoltWrapper boltWrapper;
+ final SingleOutputStreamOperator<?, ?> outputStream;
+
if (boltOutputStreams.size() < 2) { // single output stream or sink
String outputStreamId = null;
if (boltOutputStreams.size() == 1) {
outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
}
- final TypeInformation<?> outType = declarer
+ final TypeInformation<Tuple> outType = declarer
.getOutputType(outputStreamId);
- boltWrapper = new BoltWrapper(userBolt, this.outputStreams
- .get(producerId).get(inputStreamId));
- outputStream = inputStream.transform(boltId, outType, boltWrapper);
+ final BoltWrapper<Tuple, Tuple> boltWrapperSingleOutput = new BoltWrapper<Tuple, Tuple>(
+ userBolt, this.outputStreams.get(producerId).get(
+ inputStreamId));
+ boltWrapperSingleOutput.setStormTopology(stormTopology);
+
+ final SingleOutputStreamOperator<Tuple, ?> outStream = inputStream
+ .transform(boltId, outType, boltWrapperSingleOutput);
if (outType != null) {
// only for non-sink nodes
- final HashMap<String, DataStream> op = new HashMap<String, DataStream>();
- op.put(outputStreamId, outputStream);
+ final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>();
+ op.put(outputStreamId, outStream);
availableInputs.put(boltId, op);
}
+ outputStream = outStream;
} else {
- final TypeInformation<?> outType = TypeExtractor
+ final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<Tuple, SplitStreamType<Tuple>>(
+ userBolt, this.outputStreams.get(producerId).get(
+ inputStreamId));
+ boltWrapperMultipleOutputs.setStormTopology(stormTopology);
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) TypeExtractor
.getForClass(SplitStreamType.class);
- boltWrapper = new BoltWrapper(userBolt, this.outputStreams.get(producerId).get(inputStreamId));
- outputStream = inputStream.transform(boltId, outType, boltWrapper);
+ final SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream = inputStream
+ .transform(boltId, outType, boltWrapperMultipleOutputs);
- final SplitStream splitStreams = outputStream
- .split(new StormStreamSelector());
+ final SplitStream<SplitStreamType<Tuple>> splitStream = multiStream
+ .split(new StormStreamSelector<Tuple>());
- final HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+ final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>();
for (String outputStreamId : boltOutputStreams.keySet()) {
- op.put(outputStreamId, splitStreams.select(outputStreamId));
+ op.put(outputStreamId,
+ splitStream.select(outputStreamId).map(
+ new SplitStreamMapper<Tuple>()));
}
availableInputs.put(boltId, op);
+ outputStream = multiStream;
}
- boltWrapper.setStormTopology(stormTopology);
int dop = 1;
if (common.is_set_parallelism_hint()) {
@@ -342,7 +359,7 @@ public class FlinkTopologyBuilder {
* the basic bolt
* @param parallelism_hint
* the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
- * process somwehere around the cluster.
+ * process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
*/
public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
@@ -371,7 +388,7 @@ public class FlinkTopologyBuilder {
* outputs.
* @param parallelism_hint
* the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
- * process somwehere around the cluster.
+ * process somewhere around the cluster.
* @param spout
* the spout
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
deleted file mode 100644
index 71e5b86..0000000
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.storm.util;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
-
-/**
- * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
- * {@link StormStreamSelector} from a Spout or Bolt that declares multiple output streams.
- *
- * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
- * {@link ArrayKeySelector} on it.
- */
-public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
- private static final long serialVersionUID = 4672434660037669254L;
-
- private final ArrayKeySelector<Tuple> selector;
-
- public SplitStreamTypeKeySelector(TypeInformation<Tuple> type, int... fields) {
- this.selector = KeySelectorUtil.getSelectorForArray(fields, type);
- }
-
- @Override
- public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
- return selector.getKey(value.value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index f0913e8..12d967a 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -61,7 +60,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
private final Fields inputSchema;
/** The original Storm topology. */
protected StormTopology stormTopology;
-
+
/**
* We have to use this because Operators must output
* {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
@@ -239,17 +238,11 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
this.bolt.cleanup();
}
- @SuppressWarnings("unchecked")
@Override
public void processElement(final StreamRecord<IN> element) throws Exception {
this.flinkCollector.setTimestamp(element.getTimestamp());
IN value = element.getValue();
- if (value instanceof SplitStreamType) {
- this.bolt.execute(new StormTuple<IN>(((SplitStreamType<IN>) value).value,
- inputSchema));
- } else {
- this.bolt.execute(new StormTuple<IN>(value, inputSchema));
- }
+ this.bolt.execute(new StormTuple<IN>(value, inputSchema));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
index 49de476..8f0ad3b 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
@@ -20,7 +20,6 @@ import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
import org.apache.flink.storm.util.AbstractTest;
import org.junit.Assert;
@@ -30,8 +29,6 @@ import java.util.LinkedList;
public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
-
-
@Test
public void testNull() {
Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null));
@@ -100,13 +97,8 @@ public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
for (String stream : streams) {
final TypeInformation<?> type = declarer.getOutputType(stream);
- if (numberOfAttributes == 1) {
- Assert.assertEquals(type.getClass(), GenericTypeInfo.class);
- Assert.assertEquals(type.getTypeClass(), Object.class);
- } else {
- Assert.assertEquals(numberOfAttributes, type.getArity());
- Assert.assertTrue(type.isTupleType());
- }
+ Assert.assertEquals(numberOfAttributes, type.getArity());
+ Assert.assertTrue(type.isTupleType());
}
}