You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (Jira)" <ji...@apache.org> on 2020/08/14 10:00:17 UTC
[jira] [Commented] (FLINK-18961) In the case of FlatMap linking
map, if map returns null, an exception will be thrown in FlatMap
[ https://issues.apache.org/jira/browse/FLINK-18961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177670#comment-17177670 ]
Chesnay Schepler commented on FLINK-18961:
------------------------------------------
This is indeed a bit un-intuitive, realistically however will not be changed since the DataSet API is about to be subsumed.
Note that if the map function were to not be chained your try-catch block would still not work.
Essentially, returning null anywhere is undefined-behavior, so I'd suggest to explicitly check for null in your map function.
> In the case of FlatMap linking map, if map returns null, an exception will be thrown in FlatMap
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-18961
> URL: https://issues.apache.org/jira/browse/FLINK-18961
> Project: Flink
> Issue Type: Bug
> Components: API / DataSet
> Affects Versions: 1.11.0
> Environment: Mac OS 10.13.6
> Kubernetes 1.16.8
> Flink 1.11.0
> Reporter: Ryan
> Priority: Minor
> Attachments: Lark20200814-173817.png, Lark20200814-173821.png, Lark20200814-173824.png
>
>
> I found a DateSet problem. In the case of FlatMap linking map, if map returns null, an exception will be thrown in FlatMap.I think it's a problem with the operator chain.I will post a screenshot of the corresponding stack call in the attachment.
> {code:java}
> text.filter(value -> value.f0.contains("any")).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
> @Override
> public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
> Pattern pattern = Pattern.compile("\".*\"");
> Matcher matcher = pattern.matcher(value.f0);
> if(matcher.find()){
> String match = matcher.group(0);
> out.collect(match); // here throw Exception
> }
> }
> }).map(value -> {
> try {
> String jsonS = value.replace("\"\"","\"");
> jsonS = jsonS.substring(1,jsonS.length()-1);
> JSONObject json = JSONObject.parseObject(jsonS);
> String result = json.getJSONObject("body").getJSONObject("message").getString("data");
> return result; // this is null
> }catch (Exception e){
> return value;
> }
> }).print();
> Caused by: java.lang.NullPointerException: The system does not support records that are null. Null values are only supported as fields inside other objects.Caused by: java.lang.NullPointerException: The system does not support records that are null. Null values are only supported as fields inside other objects. at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:76) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at com.lemonbox.Test$1.flatMap(Test.java:42) at com.lemonbox.Test$1.flatMap(Test.java:35) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:58) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)