You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/09/28 11:11:39 UTC

[4/4] flink git commit: [FLINK-6549] [datastream] Improve error message for type mismatches with side outputs

[FLINK-6549] [datastream] Improve error message for type mismatches with side outputs

This closes #4663.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/566b7471
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/566b7471
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/566b7471

Branch: refs/heads/release-1.3
Commit: 566b7471d7f211eaee6116643dd2912d52dcb625
Parents: 02f9e26
Author: Bowen Li <bo...@gmail.com>
Authored: Sat Sep 9 00:03:13 2017 -0700
Committer: zentol <ch...@apache.org>
Committed: Thu Sep 28 13:10:23 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/tasks/OperatorChain.java      | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/566b7471/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0875279..68dedfb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -526,6 +526,16 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
 				operator.setKeyContextElement1(copy);
 				operator.processElement(copy);
+			} catch (ClassCastException e) {
+				// Enrich error message
+				ClassCastException replace = new ClassCastException(
+						String.format("%s. Failed to push OutputTag with id '%s' to operator. " +
+								"This can occur when multiple OutputTags with different types " +
+								"but identical names are being used.",
+								e.getMessage(), outputTag.getId()));
+
+				throw new ExceptionInChainedOperatorException(replace);
+
 			} catch (Exception e) {
 				throw new ExceptionInChainedOperatorException(e);
 			}