You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kevin Lam (Jira)" <ji...@apache.org> on 2022/03/10 16:03:00 UTC

[jira] [Commented] (FLINK-21406) Add AvroParquetFileRecordFormat

    [ https://issues.apache.org/jira/browse/FLINK-21406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504379#comment-17504379 ] 

Kevin Lam commented on FLINK-21406:
-----------------------------------

Hi, is there a ticket for someone to implement splitting functionality for AvroParquetFileRecordFormat? And if so, is someone working on it? 

> Add AvroParquetFileRecordFormat
> -------------------------------
>
>                 Key: FLINK-21406
>                 URL: https://issues.apache.org/jira/browse/FLINK-21406
>             Project: Flink
>          Issue Type: New Feature
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: Chesnay Schepler
>            Assignee: Jing Ge
>            Priority: Minor
>              Labels: auto-deprioritized-major, pull-request-available
>             Fix For: 1.15.0
>
>
> There is currently no easy way to read avro GenericRecords from parquet via the new {{FileSource}}.
> While helping out a user I started writing FileRecordFormat that could do that, but it requires some finalization.
> The implementation is similar to our ParquetAvroWriters class, in that we just wrap some parquet classes and bridge our FileSystem with their IO abstraction.
> The main goal was to have a format that reads data through our FileSystems, and not work directly against Hadoop to prevent a ClassLoader leak from the S3AFileSystem (threads in a thread pool can keep references to the user classloader).
> According to the user it appears to be working, but it will need some cleanup, ideally support for specific records, support for checkpointing (which should be fairly easy I believe), maybe splitting files (not sure whether this works properly with Parquet) and of course tests + documentation.
> {code}
> public class ParquetAvroFileRecordFormat implements FileRecordFormat<GenericRecord> {
>     private final transient Schema schema;
>     public ParquetAvroFileRecordFormat(Schema schema) {
>         this.schema = schema;
>     }
>     @Override
>     public Reader<GenericRecord> createReader(
>             Configuration config, Path filePath, long splitOffset, long splitLength)
>             throws IOException {
>         final FileSystem fs = filePath.getFileSystem();
>         final FileStatus status = fs.getFileStatus(filePath);
>         final FSDataInputStream in = fs.open(filePath);
>         return new MyReader(
>                 AvroParquetReader.<GenericRecord>builder(new InputFileWrapper(in, status.getLen()))
>                         .withDataModel(GenericData.get())
>                         .build());
>     }
>     @Override
>     public Reader<GenericRecord> restoreReader(
>             Configuration config,
>             Path filePath,
>             long restoredOffset,
>             long splitOffset,
>             long splitLength) {
>         // not called if checkpointing isn't used
>         return null;
>     }
>     @Override
>     public boolean isSplittable() {
>         // let's not worry about this for now
>         return false;
>     }
>     @Override
>     public TypeInformation<GenericRecord> getProducedType() {
>         return new GenericRecordAvroTypeInfo(schema);
>     }
>     private static class MyReader implements FileRecordFormat.Reader<GenericRecord> {
>         private final ParquetReader<GenericRecord> parquetReader;
>         private MyReader(ParquetReader<GenericRecord> parquetReader) {
>             this.parquetReader = parquetReader;
>         }
>         @Nullable
>         @Override
>         public GenericRecord read() throws IOException {
>             return parquetReader.read();
>         }
>         @Override
>         public void close() throws IOException {
>             parquetReader.close();
>         }
>     }
>     private static class InputFileWrapper implements InputFile {
>         private final FSDataInputStream inputStream;
>         private final long length;
>         private InputFileWrapper(FSDataInputStream inputStream, long length) {
>             this.inputStream = inputStream;
>             this.length = length;
>         }
>         @Override
>         public long getLength() {
>             return length;
>         }
>         @Override
>         public SeekableInputStream newStream() {
>             return new SeekableInputStreamAdapter(inputStream);
>         }
>     }
>     private static class SeekableInputStreamAdapter extends DelegatingSeekableInputStream {
>         private final FSDataInputStream inputStream;
>         private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
>             super(inputStream);
>             this.inputStream = inputStream;
>         }
>         @Override
>         public long getPos() throws IOException {
>             return inputStream.getPos();
>         }
>         @Override
>         public void seek(long newPos) throws IOException {
>             inputStream.seek(newPos);
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)