You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/29 05:50:43 UTC

[GitHub] [flink-table-store] JingsongLi opened a new pull request, #180: [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory

JingsongLi opened a new pull request, #180:
URL: https://github.com/apache/flink-table-store/pull/180

   These APIs were changed significantly in 1.14 and 1.15 and are not compatible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #180: [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #180:
URL: https://github.com/apache/flink-table-store/pull/180#discussion_r910557571


##########
flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AvroFileFormat.java:
##########
@@ -19,31 +19,161 @@
 package org.apache.flink.table.store.format.avro;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.formats.avro.AbstractAvroBulkFormat;
+import org.apache.flink.formats.avro.AvroBuilder;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.AvroWriterFactory;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
 
 /** Avro {@link FileFormat}. */
 public class AvroFileFormat extends FileFormat {
 
-    private final org.apache.flink.formats.avro.AvroFileFormatFactory factory;
     private final ReadableConfig formatOptions;
 
     public AvroFileFormat(ReadableConfig formatOptions) {
         super(org.apache.flink.formats.avro.AvroFileFormatFactory.IDENTIFIER);
-        this.factory = new org.apache.flink.formats.avro.AvroFileFormatFactory();
         this.formatOptions = formatOptions;
     }
 
     @Override
-    protected BulkDecodingFormat<RowData> getDecodingFormat() {
-        return factory.createDecodingFormat(null, formatOptions); // context is useless
+    public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+            RowType type, int[][] projection, List<ResolvedExpression> filters) {
+        // avro is a file format that keeps schemas in file headers,
+        // if the schema given to the reader is not equal to the schema in header,
+        // reader will automatically map the fields and give back records with our desired
+        // schema
+        //
+        // for detailed discussion see comments in https://github.com/apache/flink/pull/18657
+        LogicalType producedType = Projection.of(projection).project(type);
+        return new AvroGenericRecordBulkFormat(
+                (RowType) producedType.copy(false), InternalTypeInfo.of(producedType));
     }
 
     @Override
-    protected EncodingFormat<BulkWriter.Factory<RowData>> getEncodingFormat() {
-        return factory.createEncodingFormat(null, formatOptions); // context is useless
+    public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {

Review Comment:
   Add comments stating that this implementation is copied from Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on pull request #180: [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on PR #180:
URL: https://github.com/apache/flink-table-store/pull/180#issuecomment-1170748014

   > With this change we cannot support formats which are already supported by Flink. We'll need built-in parquet format support.
   
   Yes, we should do more for parquet supports. Still work in https://issues.apache.org/jira/browse/FLINK-27207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #180: [FLINK-28292] Get rid of BulkReaderFormatFactory and BulkWriterFormatFactory

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #180:
URL: https://github.com/apache/flink-table-store/pull/180


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org