You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (Jira)" <ji...@apache.org> on 2020/08/14 10:00:17 UTC

[jira] [Commented] (FLINK-18961) In the case of FlatMap linking map, if map returns null, an exception will be thrown in FlatMap

    [ https://issues.apache.org/jira/browse/FLINK-18961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177670#comment-17177670 ] 

Chesnay Schepler commented on FLINK-18961:
------------------------------------------

This is indeed a bit un-intuitive, realistically however will not be changed since the DataSet API is about to be subsumed.

Note that if the map function were to not be chained your try-catch block would still not work.

Essentially, returning null anywhere is undefined-behavior, so I'd suggest to explicitly check for null in your map function.

>  In the case of FlatMap linking map, if map returns null, an exception will be thrown in FlatMap
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18961
>                 URL: https://issues.apache.org/jira/browse/FLINK-18961
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataSet
>    Affects Versions: 1.11.0
>         Environment: Mac OS 10.13.6
> Kubernetes 1.16.8
> Flink 1.11.0
>            Reporter: Ryan
>            Priority: Minor
>         Attachments: Lark20200814-173817.png, Lark20200814-173821.png, Lark20200814-173824.png
>
>
> I found a DateSet problem.  In the case of FlatMap linking map, if map returns null, an exception will be thrown in FlatMap.I think it's a problem with the operator chain.I will post a screenshot of the corresponding stack call in the attachment.
> {code:java}
> text.filter(value -> value.f0.contains("any")).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
>               @Override
>               public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
>                   Pattern pattern = Pattern.compile("\".*\"");
>                   Matcher matcher = pattern.matcher(value.f0);
>                   if(matcher.find()){
>                       String match = matcher.group(0);
>                       out.collect(match); // here throw Exception
>                   }
>               }
>         }).map(value -> {
>             try {
>                 String jsonS = value.replace("\"\"","\"");
>                 jsonS = jsonS.substring(1,jsonS.length()-1);
>                 JSONObject json = JSONObject.parseObject(jsonS);
>                 String result = json.getJSONObject("body").getJSONObject("message").getString("data");
>                 return result; // this is null 
>             }catch (Exception e){
>                 return value;
>             }
>         }).print();
> Caused by: java.lang.NullPointerException: The system does not support records that are null. Null values are only supported as fields inside other objects.Caused by: java.lang.NullPointerException: The system does not support records that are null. Null values are only supported as fields inside other objects. at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:76) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at com.lemonbox.Test$1.flatMap(Test.java:42) at com.lemonbox.Test$1.flatMap(Test.java:35) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:58) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)