You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "dalongliu (Jira)" <ji...@apache.org> on 2022/10/08 11:44:00 UTC

[jira] [Created] (FLINK-29547) Select a[1] which is array type for parquet complex type throw ClassCastException

dalongliu created FLINK-29547:
---------------------------------

             Summary: Select a[1] which is  array type for parquet complex type throw ClassCastException
                 Key: FLINK-29547
                 URL: https://issues.apache.org/jira/browse/FLINK-29547
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.16.0
            Reporter: dalongliu
             Fix For: 1.17.0


Regarding the following SQL test in HiveTableSourceITCase, it will throw ClassCastException.
{code:java}
batchTableEnv.executeSql(
        "create table parquet_complex_type_test("
                + "a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>) stored as parquet");
String[] modules = batchTableEnv.listModules();
// load hive module so that we can use array,map, named_struct function
// for convenient writing complex data
batchTableEnv.loadModule("hive", new HiveModule());
batchTableEnv.useModules("hive", CoreModuleFactory.IDENTIFIER);

batchTableEnv
        .executeSql(
                "insert into parquet_complex_type_test"
                        + " select array(1, 2), map(1, 'val1', 2, 'val2'),"
                        + " named_struct('f1', 1,  'f2', 2)")
        .await();

Table src = batchTableEnv.sqlQuery("select a[1] from parquet_complex_type_test");
List<Row> rows = CollectionUtil.iteratorToList(src.execute().collect());{code}
The exception stack: 

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Ljava.lang.Integer;
    at BatchExecCalc$37.processElement(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
    at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
    at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
    at org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:98)
    at org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:92)
    at org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
    at org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:401)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:748)

 

After debugging the code, I found the root cause is that source operator reads array data from parquet in the vectorized way, and it returns ColumnarArrayData, then in the calc operator we convert it to GenericArrayData, the object array is Object[] type instead of Integer[], so if we call the ArrayObjectArrayConverter#toExternal method converts it to Integer[], it still returns Object[] type, and then if convert the array to Integer[] type forcedly, we will get the exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)