You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/09/14 17:59:27 UTC

flink git commit: [FLINK-2658] fieldsGrouping for multiple output streams fails - added SplitStreamTypeKeySelector and JUnit tests

Repository: flink
Updated Branches:
  refs/heads/master 8acd15d82 -> ce68cbd91


[FLINK-2658] fieldsGrouping for multiple output streams fails
 - added SplitStreamTypeKeySelector and JUnit tests

This closes #1122


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce68cbd9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce68cbd9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce68cbd9

Branch: refs/heads/master
Commit: ce68cbd91621b1a58cb34b33ab27762c6525cfa0
Parents: 8acd15d
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Fri Sep 11 13:52:04 2015 +0200
Committer: mjsax <mj...@apache.org>
Committed: Mon Sep 14 17:52:27 2015 +0200

----------------------------------------------------------------------
 .../api/FlinkTopologyBuilder.java               | 22 ++++---
 .../api/SplitStreamTypeKeySelector.java         | 47 ++++++++++++++
 .../api/FlinkTopologyBuilderTest.java           | 28 +++++++++
 .../stormcompatibility/util/TestDummyBolt.java  | 55 ++++++++++++++++
 .../stormcompatibility/util/TestDummySpout.java | 66 ++++++++++++++++++++
 .../flink/stormcompatibility/util/TestSink.java | 48 ++++++++++++++
 6 files changed, 259 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index a739c23..e4d880f 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -78,8 +78,8 @@ public class FlinkTopologyBuilder {
 	 */
 	@SuppressWarnings({"rawtypes", "unchecked"})
 	public FlinkTopology createTopology() {
-		final StormTopology stormTopolgoy = this.stormBuilder.createTopology();
-		final FlinkTopology env = new FlinkTopology(stormTopolgoy);
+		final StormTopology stormTopology = this.stormBuilder.createTopology();
+		final FlinkTopology env = new FlinkTopology(stormTopology);
 		env.setParallelism(1);
 
 		final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
@@ -121,7 +121,7 @@ public class FlinkTopologyBuilder {
 			availableInputs.put(spoutId, outputStreams);
 
 			int dop = 1;
-			final ComponentCommon common = stormTopolgoy.get_spouts().get(spoutId).get_common();
+			final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
 			if (common.is_set_parallelism_hint()) {
 				dop = common.get_parallelism_hint();
 				source.setParallelism(dop);
@@ -155,7 +155,7 @@ public class FlinkTopologyBuilder {
 				final String boltId = bolt.getKey();
 				final IRichBolt userBolt = bolt.getValue();
 
-				final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
+				final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
 
 				Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
 				if (unprocessedInputs == null) {
@@ -194,9 +194,17 @@ public class FlinkTopologyBuilder {
 								final List<String> fields = grouping.get_fields();
 								if (fields.size() > 0) {
 									FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
-									inputStream = inputStream.groupBy(prodDeclarer
-											.getGroupingFieldIndexes(inputStreamId,
-													grouping.get_fields()));
+									if (producer.size() == 1) {
+										inputStream = inputStream.groupBy(prodDeclarer
+												.getGroupingFieldIndexes(inputStreamId,
+														grouping.get_fields()));
+									} else {
+										inputStream = inputStream
+												.groupBy(new SplitStreamTypeKeySelector(
+														prodDeclarer.getGroupingFieldIndexes(
+																inputStreamId,
+																grouping.get_fields())));
+									}
 								} else {
 									inputStream = inputStream.global();
 								}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
new file mode 100644
index 0000000..30227b8
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
+
+/**
+ * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
+ * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams.
+ * 
+ * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
+ * {@link ArrayKeySelector} on it.
+ */
+public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
+	private static final long serialVersionUID = 4672434660037669254L;
+
+	private final ArrayKeySelector<Tuple> selector;
+
+	public SplitStreamTypeKeySelector(int... fields) {
+		this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
+	}
+
+	@Override
+	public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
+		return selector.getKey(value.value);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
index 0187020..0dd9b1c 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
@@ -16,8 +16,13 @@
  */
 package org.apache.flink.stormcompatibility.api;
 
+import org.apache.flink.stormcompatibility.util.TestDummyBolt;
+import org.apache.flink.stormcompatibility.util.TestDummySpout;
+import org.apache.flink.stormcompatibility.util.TestSink;
 import org.junit.Test;
 
+import backtype.storm.tuple.Fields;
+
 public class FlinkTopologyBuilderTest {
 
 	@Test(expected = RuntimeException.class)
@@ -45,4 +50,27 @@ public class FlinkTopologyBuilderTest {
 		builder.createTopology();
 	}
 
+	@Test
+	public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
+		FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
+
+		flinkBuilder.setSpout("spout", new TestDummySpout());
+		flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
+				TestDummySpout.spoutStreamId, new Fields("id"));
+
+		flinkBuilder.createTopology();
+	}
+
+	@Test
+	public void testFieldsGroupingOnMultipleBoltOutputStreams() {
+		FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
+
+		flinkBuilder.setSpout("spout", new TestDummySpout());
+		flinkBuilder.setBolt("bolt", new TestDummyBolt()).shuffleGrouping("spout");
+		flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("bolt",
+				TestDummyBolt.groupingStreamId, new Fields("id"));
+
+		flinkBuilder.createTopology();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
new file mode 100644
index 0000000..ec48719
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+public class TestDummyBolt implements IRichBolt {
+	private static final long serialVersionUID = 6893611247443121322L;
+
+	public final static String shuffleStreamId = "shuffleStream";
+	public final static String groupingStreamId = "groupingStream";
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+
+	@Override
+	public void execute(Tuple input) {}
+
+	@Override
+	public void cleanup() {}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declareStream(shuffleStreamId, new Fields("data"));
+		declarer.declareStream(groupingStreamId, new Fields("id", "data"));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
new file mode 100644
index 0000000..62705b8
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+public class TestDummySpout implements IRichSpout {
+	private static final long serialVersionUID = -5190945609124603118L;
+
+	public final static String spoutStreamId = "spout-stream";
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
+
+	@Override
+	public void close() {}
+
+	@Override
+	public void activate() {}
+
+	@Override
+	public void deactivate() {}
+
+	@Override
+	public void nextTuple() {}
+
+	@Override
+	public void ack(Object msgId) {}
+
+	@Override
+	public void fail(Object msgId) {}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("data"));
+		declarer.declareStream(spoutStreamId, new Fields("id", "data"));
+	}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce68cbd9/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
new file mode 100644
index 0000000..5699219
--- /dev/null
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+public class TestSink implements IRichBolt {
+	private static final long serialVersionUID = 4314871456719370877L;
+
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
+
+	@Override
+	public void execute(Tuple input) {}
+
+	@Override
+	public void cleanup() {}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+	@Override
+	public Map<String, Object> getComponentConfiguration() {
+		return null;
+	}
+
+}