You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhaoyang Shao (Jira)" <ji...@apache.org> on 2023/09/22 18:22:00 UTC

[jira] [Created] (FLINK-33129) Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type

Zhaoyang Shao created FLINK-33129:
-------------------------------------

             Summary: Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type
                 Key: FLINK-33129
                 URL: https://issues.apache.org/jira/browse/FLINK-33129
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.17.1
            Reporter: Zhaoyang Shao
             Fix For: 1.17.1


While creating converter using `RowDataToAvroConverters.createConverter` with LocalZonedTimestampType logical type, the method will throw exception. This is because the switch clause is missing a clause for `LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZON`.

Code: [https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java#L75]

 

We can convert the value to `LocalDateTime` and then `TimestampData` using method below. Then we can apply the same converter as 
TIMESTAMP_WITHOUT_TIME_ZONE? 
 
`TimestampData fromLocalDateTime(LocalDateTime dateTime)`

Can Flink team help adding the support for this logical type and logical type root?

This is now a blocker for creating Flink Iceberg consumer with Avro GenericRecord when IcebergTable has `TimestampTZ` type field which will be converted to LocalZonedTimestampType.

See error below:
 Unsupported type: TIMESTAMP_LTZ(6) 
        stack: [ [-] 
          org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:186) 
          java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) 
          java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) 
          java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) 
          java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
          java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550) 
          java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) 
          java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517) 
          org.apache.flink.formats.avro.RowDataToAvroConverters.createRowConverter(RowDataToAvroConverters.java:224) 
          org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:178) 
          org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.<init>(RowDataToAvroGenericRecordConverter.java:46) 
          org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.fromIcebergSchema(RowDataToAvroGenericRecordConverter.java:60) 
          org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.lazyConverter(AvroGenericRecordReaderFunction.java:93) 
          org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.createDataIterator(AvroGenericRecordReaderFunction.java:85) 
          org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:39) 
          org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:27) 
          org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(IcebergSourceSplitReader.java:74) 
          org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) 
          org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) 
          org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) 
          java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
          java.util.concurrent.FutureTask.run(FutureTask.java:264) 
          java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
          java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
          java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)