You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/30 02:46:52 UTC

[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #180: [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory

tsreaper commented on code in PR #180:
URL: https://github.com/apache/flink-table-store/pull/180#discussion_r910557571


##########
flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java:
##########
@@ -19,31 +19,161 @@
 package org.apache.flink.table.store.format.avro;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.formats.avro.AbstractAvroBulkFormat;
+import org.apache.flink.formats.avro.AvroBuilder;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.AvroWriterFactory;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
 
 /** Avro {@link FileFormat}. */
 public class AvroFileFormat extends FileFormat {
 
-    private final org.apache.flink.formats.avro.AvroFileFormatFactory factory;
     private final ReadableConfig formatOptions;
 
     public AvroFileFormat(ReadableConfig formatOptions) {
         super(org.apache.flink.formats.avro.AvroFileFormatFactory.IDENTIFIER);
-        this.factory = new org.apache.flink.formats.avro.AvroFileFormatFactory();
         this.formatOptions = formatOptions;
     }
 
     @Override
-    protected BulkDecodingFormat<RowData> getDecodingFormat() {
-        return factory.createDecodingFormat(null, formatOptions); // context is useless
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+        // avro is a file format that keeps schemas in file headers,
+        // if the schema given to the reader is not equal to the schema in header,
+        // reader will automatically map the fields and give back records with our desired
+        // schema
+        //
+        // for detailed discussion see comments in https://github.com/apache/flink/pull/18657
+        LogicalType producedType = Projection.of(projection).project(type);
+        return new AvroGenericRecordBulkFormat(
+                (RowType) producedType.copy(false), InternalTypeInfo.of(producedType));
     }
 
     @Override
-    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
-        return factory.createEncodingFormat(null, formatOptions); // context is useless
+    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {

Review Comment:
   Add comments stating that this implementation is copied from Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org