You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Luke Hutchison (JIRA)" <ji...@apache.org> on 2017/04/07 06:10:41 UTC

[jira] [Updated] (FLINK-6276) InvalidTypesException: Unknown Error. Type is null.

     [ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Luke Hutchison updated FLINK-6276:
----------------------------------
    Description: 
Quite frequently when writing Flink code, I get the exception {{InvalidTypesException: Unknown Error. Type is null.}} 

A small example that triggers it is:

{code}
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class TestMain {

    @SafeVarargs
    public static <K, V> DataSet<Tuple2<K, List<V>>> join(V missingValuePlaceholder,
            DataSet<Tuple2<K, V>>... datasets) {
        DataSet<Tuple2<K, List<V>>> join = null;
        for (int i = 0; i < datasets.length; i++) {
            final int datasetIdx = i;
            if (datasetIdx == 0) {
                join = datasets[datasetIdx] //
                        .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) //
                        .name("start join");
            } else {
                join = join.coGroup(datasets[datasetIdx]) //
                        .where(0).equalTo(0) //
                        .with((Iterable<Tuple2<K, List<V>>> li, Iterable<Tuple2<K, V>> ri,
                                Collector<Tuple2<K, List<V>>> out) -> {
                            K key = null;
                            List<V> vals = new ArrayList<>(datasetIdx + 1);
                            Iterator<Tuple2<K, List<V>>> lIter = li.iterator();
                            if (!lIter.hasNext()) {
                                for (int j = 0; j < datasetIdx; j++) {
                                    vals.add(missingValuePlaceholder);
                                }
                            } else {
                                Tuple2<K, List<V>> lt = lIter.next();
                                key = lt.f0;
                                vals.addAll(lt.f1);
                                if (lIter.hasNext()) {
                                    throw new RuntimeException("Got non-unique key: " + key);
                                }
                            }
                            Iterator<Tuple2<K, V>> rIter = ri.iterator();
                            if (!rIter.hasNext()) {
                                vals.add(missingValuePlaceholder);
                            } else {
                                Tuple2<K, V> rt = rIter.next();
                                key = rt.f0;
                                vals.add(rt.f1);
                                if (rIter.hasNext()) {
                                    throw new RuntimeException("Got non-unique key: " + key);
                                }
                            }
                            out.collect(new Tuple2<K, List<V>>(key, vals));
                        }) //
                        .name("join #" + datasetIdx);
            }
        }
        return join;
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<String, Integer>> x = //
                env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), new Tuple2<>("c", 5));
        DataSet<Tuple2<String, Integer>> y = //
                env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), new Tuple2<>("d", 2));
        DataSet<Tuple2<String, Integer>> z = //
                env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), new Tuple2<>("e", 9));

        System.out.println(join(-1, x, y, z).collect());
    }
}
{code}

The stacktrace that is triggered is:

{noformat}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'join(TestMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
	at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424)
	at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27)
	at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
	at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
	at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:23)
	... 1 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
	... 6 more
{noformat}

The code compiles fine, and typechecks. Maybe something is wrong with the code; but either way, Flink should report a better error message.

A separate issue here is that the error message is being reported for the wrong function: the problem is not with the return type of {{join(TestMain.java:23)}}, it is some internal type (probably for a lambda or something) within the function. (It is the {{where}} clause that throws the exception.)


  was:
Quite frequently when writing Flink code, I get the exception {{InvalidTypesException: Unknown Error. Type is null.}} 

A small example that triggers it is:

{code}
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class TestMain {

    @SafeVarargs
    public static <K, V> DataSet<Tuple2<K, List<V>>> join(V missingValuePlaceholder,
            DataSet<Tuple2<K, V>>... datasets) {
        DataSet<Tuple2<K, List<V>>> join = null;
        for (int i = 0; i < datasets.length; i++) {
            final int datasetIdx = i;
            if (datasetIdx == 0) {
                join = datasets[datasetIdx] //
                        .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) //
                        .name("start join");
            } else {
                join = join.coGroup(datasets[datasetIdx]) //
                        .where(0).equalTo(0) //
                        .with((Iterable<Tuple2<K, List<V>>> li, Iterable<Tuple2<K, V>> ri,
                                Collector<Tuple2<K, List<V>>> out) -> {
                            K key = null;
                            List<V> vals = new ArrayList<>(datasetIdx + 1);
                            Iterator<Tuple2<K, List<V>>> lIter = li.iterator();
                            if (!lIter.hasNext()) {
                                for (int j = 0; j < datasetIdx; j++) {
                                    vals.add(missingValuePlaceholder);
                                }
                            } else {
                                Tuple2<K, List<V>> lt = lIter.next();
                                key = lt.f0;
                                vals.addAll(lt.f1);
                                if (lIter.hasNext()) {
                                    throw new RuntimeException("Got non-unique key: " + key);
                                }
                            }
                            Iterator<Tuple2<K, V>> rIter = ri.iterator();
                            if (!rIter.hasNext()) {
                                vals.add(missingValuePlaceholder);
                            } else {
                                Tuple2<K, V> rt = rIter.next();
                                key = rt.f0;
                                vals.add(rt.f1);
                                if (rIter.hasNext()) {
                                    throw new RuntimeException("Got non-unique key: " + key);
                                }
                            }
                            out.collect(new Tuple2<K, List<V>>(key, vals));
                        }) //
                        .name("join #" + datasetIdx);
            }
        }
        return join;
    }

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<String, Integer>> x = //
                env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), new Tuple2<>("c", 5));
        DataSet<Tuple2<String, Integer>> y = //
                env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), new Tuple2<>("d", 2));
        DataSet<Tuple2<String, Integer>> z = //
                env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), new Tuple2<>("e", 9));

        System.out.println(join(-1, x, y, z).collect());
    }
}
{code}

The stacktrace that is triggered is:

{noformat}
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'join(TestMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
	at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
	at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424)
	at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27)
	at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
	at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
	at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:23)
	... 1 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type is null.
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
	... 6 more
{noformat}

The code compiles fine, and typechecks. Maybe something is wrong with the code; but either way, Flink should report a better error message.

A separate issue here is that the error message is being reported for the wrong function: the problem is not with the return type of {{ join(TestMain.java:23) }}, it is some internal type (probably for a lambda or something) within the function. (It is the {{where}} clause that throws the exception.)



> InvalidTypesException: Unknown Error. Type is null.
> ---------------------------------------------------
>
>                 Key: FLINK-6276
>                 URL: https://issues.apache.org/jira/browse/FLINK-6276
>             Project: Flink
>          Issue Type: Bug
>          Components: Core, DataSet API
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>
> Quite frequently when writing Flink code, I get the exception {{InvalidTypesException: Unknown Error. Type is null.}} 
> A small example that triggers it is:
> {code}
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Iterator;
> import java.util.List;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> public class TestMain {
>     @SafeVarargs
>     public static <K, V> DataSet<Tuple2<K, List<V>>> join(V missingValuePlaceholder,
>             DataSet<Tuple2<K, V>>... datasets) {
>         DataSet<Tuple2<K, List<V>>> join = null;
>         for (int i = 0; i < datasets.length; i++) {
>             final int datasetIdx = i;
>             if (datasetIdx == 0) {
>                 join = datasets[datasetIdx] //
>                         .map(t -> new Tuple2<>(t.f0, Arrays.asList(t.f1))) //
>                         .name("start join");
>             } else {
>                 join = join.coGroup(datasets[datasetIdx]) //
>                         .where(0).equalTo(0) //
>                         .with((Iterable<Tuple2<K, List<V>>> li, Iterable<Tuple2<K, V>> ri,
>                                 Collector<Tuple2<K, List<V>>> out) -> {
>                             K key = null;
>                             List<V> vals = new ArrayList<>(datasetIdx + 1);
>                             Iterator<Tuple2<K, List<V>>> lIter = li.iterator();
>                             if (!lIter.hasNext()) {
>                                 for (int j = 0; j < datasetIdx; j++) {
>                                     vals.add(missingValuePlaceholder);
>                                 }
>                             } else {
>                                 Tuple2<K, List<V>> lt = lIter.next();
>                                 key = lt.f0;
>                                 vals.addAll(lt.f1);
>                                 if (lIter.hasNext()) {
>                                     throw new RuntimeException("Got non-unique key: " + key);
>                                 }
>                             }
>                             Iterator<Tuple2<K, V>> rIter = ri.iterator();
>                             if (!rIter.hasNext()) {
>                                 vals.add(missingValuePlaceholder);
>                             } else {
>                                 Tuple2<K, V> rt = rIter.next();
>                                 key = rt.f0;
>                                 vals.add(rt.f1);
>                                 if (rIter.hasNext()) {
>                                     throw new RuntimeException("Got non-unique key: " + key);
>                                 }
>                             }
>                             out.collect(new Tuple2<K, List<V>>(key, vals));
>                         }) //
>                         .name("join #" + datasetIdx);
>             }
>         }
>         return join;
>     }
>     public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>         DataSet<Tuple2<String, Integer>> x = //
>                 env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), new Tuple2<>("c", 5));
>         DataSet<Tuple2<String, Integer>> y = //
>                 env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), new Tuple2<>("d", 2));
>         DataSet<Tuple2<String, Integer>> z = //
>                 env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), new Tuple2<>("e", 9));
>         System.out.println(join(-1, x, y, z).collect());
>     }
> }
> {code}
> The stacktrace that is triggered is:
> {noformat}
> Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'join(TestMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
> 	at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
> 	at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424)
> 	at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27)
> 	at com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Unknown Error. Type is null.
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
> 	at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
> 	at com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:23)
> 	... 1 more
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown Error. Type is null.
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
> 	... 6 more
> {noformat}
> The code compiles fine, and typechecks. Maybe something is wrong with the code; but either way, Flink should report a better error message.
> A separate issue here is that the error message is being reported for the wrong function: the problem is not with the return type of {{join(TestMain.java:23)}}, it is some internal type (probably for a lambda or something) within the function. (It is the {{where}} clause that throws the exception.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)