You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kevin Lam (Jira)" <ji...@apache.org> on 2022/03/10 16:03:00 UTC
[jira] [Commented] (FLINK-21406) Add AvroParquetFileRecordFormat
[ https://issues.apache.org/jira/browse/FLINK-21406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504379#comment-17504379 ]
Kevin Lam commented on FLINK-21406:
-----------------------------------
Hi, is there a ticket for someone to implement splitting functionality for AvroParquetFileRecordFormat? And if so, is someone working on it?
> Add AvroParquetFileRecordFormat
> -------------------------------
>
> Key: FLINK-21406
> URL: https://issues.apache.org/jira/browse/FLINK-21406
> Project: Flink
> Issue Type: New Feature
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Reporter: Chesnay Schepler
> Assignee: Jing Ge
> Priority: Minor
> Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
>
> There is currently no easy way to read avro GenericRecords from parquet via the new {{FileSource}}.
> While helping out a user I started writing FileRecordFormat that could do that, but it requires some finalization.
> The implementation is similar to our ParquetAvroWriters class, in that we just wrap some parquet classes and bridge our FileSystem with their IO abstraction.
> The main goal was to have a format that reads data through our FileSystems, and not work directly against Hadoop to prevent a ClassLoader leak from the S3AFileSystem (threads in a thread pool can keep references to the user classloader).
> According to the user it appears to be working, but it will need some cleanup, ideally support for specific records, support for checkpointing (which should be fairly easy I believe), maybe splitting files (not sure whether this works properly with Parquet) and of course tests + documentation.
> {code}
> public class ParquetAvroFileRecordFormat implements FileRecordFormat<GenericRecord> {
> private final transient Schema schema;
> public ParquetAvroFileRecordFormat(Schema schema) {
> this.schema = schema;
> }
> @Override
> public Reader<GenericRecord> createReader(
> Configuration config, Path filePath, long splitOffset, long splitLength)
> throws IOException {
> final FileSystem fs = filePath.getFileSystem();
> final FileStatus status = fs.getFileStatus(filePath);
> final FSDataInputStream in = fs.open(filePath);
> return new MyReader(
> AvroParquetReader.<GenericRecord>builder(new InputFileWrapper(in, status.getLen()))
> .withDataModel(GenericData.get())
> .build());
> }
> @Override
> public Reader<GenericRecord> restoreReader(
> Configuration config,
> Path filePath,
> long restoredOffset,
> long splitOffset,
> long splitLength) {
> // not called if checkpointing isn't used
> return null;
> }
> @Override
> public boolean isSplittable() {
> // let's not worry about this for now
> return false;
> }
> @Override
> public TypeInformation<GenericRecord> getProducedType() {
> return new GenericRecordAvroTypeInfo(schema);
> }
> private static class MyReader implements FileRecordFormat.Reader<GenericRecord> {
> private final ParquetReader<GenericRecord> parquetReader;
> private MyReader(ParquetReader<GenericRecord> parquetReader) {
> this.parquetReader = parquetReader;
> }
> @Nullable
> @Override
> public GenericRecord read() throws IOException {
> return parquetReader.read();
> }
> @Override
> public void close() throws IOException {
> parquetReader.close();
> }
> }
> private static class InputFileWrapper implements InputFile {
> private final FSDataInputStream inputStream;
> private final long length;
> private InputFileWrapper(FSDataInputStream inputStream, long length) {
> this.inputStream = inputStream;
> this.length = length;
> }
> @Override
> public long getLength() {
> return length;
> }
> @Override
> public SeekableInputStream newStream() {
> return new SeekableInputStreamAdapter(inputStream);
> }
> }
> private static class SeekableInputStreamAdapter extends DelegatingSeekableInputStream {
> private final FSDataInputStream inputStream;
> private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
> super(inputStream);
> this.inputStream = inputStream;
> }
> @Override
> public long getPos() throws IOException {
> return inputStream.getPos();
> }
> @Override
> public void seek(long newPos) throws IOException {
> inputStream.seek(newPos);
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)