You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/09/14 17:59:27 UTC
flink git commit: [FLINK-2658] fieldsGrouping for multiple output
streams fails - added SplitStreamTypeKeySelector and JUnit tests
Repository: flink
Updated Branches:
refs/heads/master 8acd15d82 -> ce68cbd91
[FLINK-2658] fieldsGrouping for multiple output streams fails
- added SplitStreamTypeKeySelector and JUnit tests
This closes #1122
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce68cbd9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce68cbd9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce68cbd9
Branch: refs/heads/master
Commit: ce68cbd91621b1a58cb34b33ab27762c6525cfa0
Parents: 8acd15d
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Fri Sep 11 13:52:04 2015 +0200
Committer: mjsax <mj...@apache.org>
Committed: Mon Sep 14 17:52:27 2015 +0200
----------------------------------------------------------------------
.../api/FlinkTopologyBuilder.java | 22 ++++---
.../api/SplitStreamTypeKeySelector.java | 47 ++++++++++++++
.../api/FlinkTopologyBuilderTest.java | 28 +++++++++
.../stormcompatibility/util/TestDummyBolt.java | 55 ++++++++++++++++
.../stormcompatibility/util/TestDummySpout.java | 66 ++++++++++++++++++++
.../flink/stormcompatibility/util/TestSink.java | 48 ++++++++++++++
6 files changed, 259 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index a739c23..e4d880f 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -78,8 +78,8 @@ public class FlinkTopologyBuilder {
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public FlinkTopology createTopology() {
- final StormTopology stormTopolgoy = this.stormBuilder.createTopology();
- final FlinkTopology env = new FlinkTopology(stormTopolgoy);
+ final StormTopology stormTopology = this.stormBuilder.createTopology();
+ final FlinkTopology env = new FlinkTopology(stormTopology);
env.setParallelism(1);
final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
@@ -121,7 +121,7 @@ public class FlinkTopologyBuilder {
availableInputs.put(spoutId, outputStreams);
int dop = 1;
- final ComponentCommon common = stormTopolgoy.get_spouts().get(spoutId).get_common();
+ final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
if (common.is_set_parallelism_hint()) {
dop = common.get_parallelism_hint();
source.setParallelism(dop);
@@ -155,7 +155,7 @@ public class FlinkTopologyBuilder {
final String boltId = bolt.getKey();
final IRichBolt userBolt = bolt.getValue();
- final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
+ final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
if (unprocessedInputs == null) {
@@ -194,9 +194,17 @@ public class FlinkTopologyBuilder {
final List<String> fields = grouping.get_fields();
if (fields.size() > 0) {
FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
- inputStream = inputStream.groupBy(prodDeclarer
- .getGroupingFieldIndexes(inputStreamId,
- grouping.get_fields()));
+ if (producer.size() == 1) {
+ inputStream = inputStream.groupBy(prodDeclarer
+ .getGroupingFieldIndexes(inputStreamId,
+ grouping.get_fields()));
+ } else {
+ inputStream = inputStream
+ .groupBy(new SplitStreamTypeKeySelector(
+ prodDeclarer.getGroupingFieldIndexes(
+ inputStreamId,
+ grouping.get_fields())));
+ }
} else {
inputStream = inputStream.global();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
new file mode 100644
index 0000000..30227b8
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
+
+/**
+ * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
+ * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams.
+ *
+ * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
+ * {@link ArrayKeySelector} on it.
+ */
+public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
+ private static final long serialVersionUID = 4672434660037669254L;
+
+ private final ArrayKeySelector<Tuple> selector;
+
+ public SplitStreamTypeKeySelector(int... fields) {
+ this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
+ }
+
+ @Override
+ public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
+ return selector.getKey(value.value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
index 0187020..0dd9b1c 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
@@ -16,8 +16,13 @@
*/
package org.apache.flink.stormcompatibility.api;
+import org.apache.flink.stormcompatibility.util.TestDummyBolt;
+import org.apache.flink.stormcompatibility.util.TestDummySpout;
+import org.apache.flink.stormcompatibility.util.TestSink;
import org.junit.Test;
+import backtype.storm.tuple.Fields;
+
public class FlinkTopologyBuilderTest {
@Test(expected = RuntimeException.class)
@@ -45,4 +50,27 @@ public class FlinkTopologyBuilderTest {
builder.createTopology();
}
+ @Test
+ public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
+ FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
+
+ flinkBuilder.setSpout("spout", new TestDummySpout());
+ flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
+ TestDummySpout.spoutStreamId, new Fields("id"));
+
+ flinkBuilder.createTopology();
+ }
+
+ @Test
+ public void testFieldsGroupingOnMultipleBoltOutputStreams() {
+ FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
+
+ flinkBuilder.setSpout("spout", new TestDummySpout());
+ flinkBuilder.setBolt("bolt", new TestDummyBolt()).shuffleGrouping("spout");
+ flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("bolt",
+ TestDummyBolt.groupingStreamId, new Fields("id"));
+
+ flinkBuilder.createTopology();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
new file mode 100644
index 0000000..ec48719
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+public class TestDummyBolt implements IRichBolt {
+ private static final long serialVersionUID = 6893611247443121322L;
+
+ public final static String shuffleStreamId = "shuffleStream";
+ public final static String groupingStreamId = "groupingStream";
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+
+ @Override
+ public void execute(Tuple input) {}
+
+ @Override
+ public void cleanup() {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(shuffleStreamId, new Fields("data"));
+ declarer.declareStream(groupingStreamId, new Fields("id", "data"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
new file mode 100644
index 0000000..62705b8
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+public class TestDummySpout implements IRichSpout {
+ private static final long serialVersionUID = -5190945609124603118L;
+
+ public final static String spoutStreamId = "spout-stream";
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void activate() {}
+
+ @Override
+ public void deactivate() {}
+
+ @Override
+ public void nextTuple() {}
+
+ @Override
+ public void ack(Object msgId) {}
+
+ @Override
+ public void fail(Object msgId) {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("data"));
+ declarer.declareStream(spoutStreamId, new Fields("id", "data"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
new file mode 100644
index 0000000..5699219
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+public class TestSink implements IRichBolt {
+ private static final long serialVersionUID = 4314871456719370877L;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+
+ @Override
+ public void execute(Tuple input) {}
+
+ @Override
+ public void cleanup() {}
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}