You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 14:15:25 UTC
flink git commit: Fix [FLINK-2391] Storm-compatibility:method
FlinkTopologyBuilder.createTopology() throws java.lang.NullPointerException
bug
Repository: flink
Updated Branches:
refs/heads/master 8f87b7164 -> ada9037be
Fix [FLINK-2391] Storm-compatibility:method FlinkTopologyBuilder.createTopology() throws java.lang.NullPointerException bug
This closes #940
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ada9037b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ada9037b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ada9037b
Branch: refs/heads/master
Commit: ada9037bef760d46a4c3be2177e04bd72e620dad
Parents: 8f87b71
Author: root <ro...@linux-3lsu.site>
Authored: Mon Jul 27 16:54:05 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 29 11:14:04 2015 +0200
----------------------------------------------------------------------
.../flink/stormcompatibility/api/FlinkTopologyBuilder.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ada9037b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index 6c39561..4ecf4a6 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -64,6 +64,8 @@ public class FlinkTopologyBuilder {
private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
/** All declared output schemas by operator ID */
private final HashMap<String, Fields> outputSchemas = new HashMap<String, Fields>();
+ /** All spouts&bolts declarers by their ID */
+ private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
/**
* Creates a Flink program that used the specified spouts and bolts.
@@ -84,6 +86,7 @@ public class FlinkTopologyBuilder {
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userSpout.declareOutputFields(declarer);
this.outputSchemas.put(spoutId, declarer.outputSchema);
+ declarers.put(spoutId, declarer);
/* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
* and StormCollector)
@@ -124,6 +127,7 @@ public class FlinkTopologyBuilder {
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userBolt.declareOutputFields(declarer);
this.outputSchemas.put(boltId, declarer.outputSchema);
+ declarers.put(boltId, declarer);
final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
@@ -153,7 +157,8 @@ public class FlinkTopologyBuilder {
// global grouping is emulated in Storm via an empty fields grouping list
final List<String> fields = grouping.get_fields();
if (fields.size() > 0) {
- inputDataStream = inputDataStream.groupBy(declarer.getGroupingFieldIndexes(grouping
+ FlinkOutputFieldsDeclarer procDeclarer = this.declarers.get(producerId);
+ inputDataStream = inputDataStream.groupBy(procDeclarer.getGroupingFieldIndexes(grouping
.get_fields()));
} else {
inputDataStream = inputDataStream.global();