You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2022/01/04 10:40:00 UTC

[jira] [Updated] (FLINK-6146) Incorrect function name given in exception thrown by DataSet.getType()

     [ https://issues.apache.org/jira/browse/FLINK-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Flink Jira Bot updated FLINK-6146:
----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor  (was: auto-deprioritized-major stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion.


> Incorrect function name given in exception thrown by DataSet.getType()
> ----------------------------------------------------------------------
>
>                 Key: FLINK-6146
>                 URL: https://issues.apache.org/jira/browse/FLINK-6146
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataSet
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> In the following code, this exception is thrown at the line marked {{// (1)}}:
> {noformat}
> Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'convertToFractionalRank(MainTest.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
> 	at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
> 	at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607)
> 	at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28)
> 	at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null.
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
> 	at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
> 	at com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21)
> 	... 1 more
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type is null.
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
> 	... 6 more
> {noformat}
> The code:
> {code}
> import org.apache.flink.api.common.operators.Order;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.util.Collector;
> public class MainTest {
>     public static <K> DataSet<Tuple2<K, Float>> convertToFractionalRank(DataSet<Tuple2<K, Float>> key_score) {
>         // Sum within each key
>         // Result: ("", key, totScore)
>         DataSet<Tuple3<String, K, Float>> blank_key_totScore = 
>                 key_score 
>                         .groupBy(0).sum(1) 
>                         // Prepend with "" to prep for for join
>                         .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum = */ t.f1));
>         // Count unique keys. Result: ("", numKeys)
>         DataSet<Tuple2<String, Integer>> blank_numKeys = 
>                 blank_key_totScore 
>                         .distinct(1)                                                                 // (1)
>                         .map(t -> new Tuple2<String, Integer>("", 1)) 
>                         .groupBy(0).sum(1);
>         // Sort scores into order, then return the fractional rank in the range [0, 1]
>         return blank_key_totScore 
>                 .coGroup(blank_numKeys) 
>                 .where(0).equalTo(0) 
>                 .with((Iterable<Tuple3<String, K, Float>> ai, Iterable<Tuple2<String, Integer>> bi,
>                         Collector<Tuple4<String, K, Float, Integer>> out) -> {
>                     int numKeys = bi.iterator().next().f1;
>                     for (Tuple3<String, K, Float> a : ai) {
>                         out.collect(new Tuple4<>("", /* key = */ a.f1, /* totScore = */ a.f2, numKeys));
>                     }
>                 }) 
>                 // Group by "" (i.e. make into one group, so all the scores can be sorted together)
>                 .groupBy(0) 
>                 // Sort in descending order of score (the highest score gets the lowest rank, and vice versa)
>                 .sortGroup(2, Order.DESCENDING) 
>                 // Convert sorted rank from [0, numKeys-1] -> [0, 1]
>                 .reduceGroup(
>                         (Iterable<Tuple4<String, K, Float, Integer>> iter, Collector<Tuple2<K, Float>> out) -> {
>                             int rank = 0;
>                             for (Tuple4<String, K, Float, Integer> t : iter) {
>                                 int numKeys = t.f3; // Same for all tuples
>                                 float fracRank = rank / (float) (numKeys - 1);
>                                 out.collect(new Tuple2<>(/* key = */ t.f1, fracRank));
>                                 rank++;
>                             }
>                         })
>                 .name("convert problem severity scores into building scores");
>     }
>     public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>         DataSource<Tuple2<Tuple2<String, Integer>, Float>> ds = env.fromElements(
>                 new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new Tuple2<>("x", 2), 1.0f),
>                 new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new Tuple2<>("x", 3), 1.0f),
>                 new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new Tuple2<>("y", 1), 1.0f),
>                 new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new Tuple2<>("y", 3), 1.0f));
>         DataSet<Tuple2<Tuple2<String, Integer>, Float>> ds2 = convertToFractionalRank(ds);
>         System.out.println(ds2.collect());
>     }
> }
> {code}
> However, it is the {{distinct}} operator, used to compute an intermediate value, not the return value of the function, for which the type cannot be computed. The error message is quoting the wrong location information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)