You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 14:15:25 UTC

flink git commit: Fix [FLINK-2391] Storm-compatibility:method FlinkTopologyBuilder.createTopology() throws java.lang.NullPointerException bug

Repository: flink
Updated Branches:
  refs/heads/master 8f87b7164 -> ada9037be


Fix [FLINK-2391] Storm-compatibility:method FlinkTopologyBuilder.createTopology() throws java.lang.NullPointerException bug

This closes #940


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ada9037b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ada9037b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ada9037b

Branch: refs/heads/master
Commit: ada9037bef760d46a4c3be2177e04bd72e620dad
Parents: 8f87b71
Author: root <ro...@linux-3lsu.site>
Authored: Mon Jul 27 16:54:05 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 29 11:14:04 2015 +0200

----------------------------------------------------------------------
 .../flink/stormcompatibility/api/FlinkTopologyBuilder.java    | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ada9037b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index 6c39561..4ecf4a6 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -64,6 +64,8 @@ public class FlinkTopologyBuilder {
 	private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
 	/** All declared output schemas by operator ID */
 	private final HashMap<String, Fields> outputSchemas = new HashMap<String, Fields>();
+	/** All spouts&bolts declarers by their ID */
+	private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
 
 	/**
 	 * Creates a Flink program that used the specified spouts and bolts.
@@ -84,6 +86,7 @@ public class FlinkTopologyBuilder {
 			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
 			userSpout.declareOutputFields(declarer);
 			this.outputSchemas.put(spoutId, declarer.outputSchema);
+			declarers.put(spoutId, declarer);
 
 			/* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
 			 * and StormCollector)
@@ -124,6 +127,7 @@ public class FlinkTopologyBuilder {
 				final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
 				userBolt.declareOutputFields(declarer);
 				this.outputSchemas.put(boltId, declarer.outputSchema);
+				declarers.put(boltId, declarer);
 
 				final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
 
@@ -153,7 +157,8 @@ public class FlinkTopologyBuilder {
 							// global grouping is emulated in Storm via an empty fields grouping list
 							final List<String> fields = grouping.get_fields();
 							if (fields.size() > 0) {
-								inputDataStream = inputDataStream.groupBy(declarer.getGroupingFieldIndexes(grouping
+								FlinkOutputFieldsDeclarer procDeclarer = this.declarers.get(producerId);
+								inputDataStream = inputDataStream.groupBy(procDeclarer.getGroupingFieldIndexes(grouping
 										.get_fields()));
 							} else {
 								inputDataStream = inputDataStream.global();