You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/28 12:51:42 UTC

[GitHub] [flink] AHeise commented on a change in pull request #17501: [Draft][FLINK-21406][RecordFormat] build AvroParquetRecordFormat for the new FileSource

AHeise commented on a change in pull request #17501:
URL: https://github.com/apache/flink/pull/17501#discussion_r738073990



##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/RecordFormat.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A reader format that reads individual records from a file via {@link Path}.
+ *
+ * <p>This interface teams up with its superinterface together build a 2-levels API. {@link
+ * StreamFormat} focuses on abstract input stream and {@link RecordFormat} pays attention to the
+ * concrete FileSystem. This format is for cases where the readers need access to the file directly
+ * or need to create a custom stream. For readers that can directly work on input streams, consider
+ * using the superinterface {@link StreamFormat}.
+ *
+ * <p>Please refer the javadoc of {@link StreamFormat} for details.
+ *
+ * @param <T> - The type of records created by this format reader.
+ */
+@PublicEvolving
+public interface RecordFormat<T> extends StreamFormat<T> {
+
+    /**
+     * Creates a new reader to read in this format. This method is called when a fresh reader is
+     * created for a split that was assigned from the enumerator. This method may also be called on
+     * recovery from a checkpoint, if the reader never stored an offset in the checkpoint (see
+     * {@link #restoreReader(Configuration, Path, long, long, long)} for details.
+     *
+     * <p>Provide the default implementation, subclasses are therefore not forced to implement it.
+     * Compare to the {@link #createReader(Configuration, FSDataInputStream, long, long)}, This
+     * method put the focus on the {@link Path}. The default implementation adapts information given
+     * by method arguments to {@link FSDataInputStream} and calls {@link
+     * #createReader(Configuration, FSDataInputStream, long, long)}.
+     *
+     * <p>If the format is {@link #isSplittable() splittable}, then the {@code inputStream} is
+     * positioned to the beginning of the file split, otherwise it will be at position zero.
+     */
+    default StreamFormat.Reader<T> createReader(
+            Configuration config, Path filePath, long splitOffset, long splitLength)
+            throws IOException {
+
+        checkNotNull(filePath, "filePath");
+
+        final FileSystem fileSystem = filePath.getFileSystem();
+        final FileStatus fileStatus = fileSystem.getFileStatus(filePath);
+        final FSDataInputStream inputStream = fileSystem.open(filePath);
+
+        if (isSplittable()) {
+            inputStream.seek(splitOffset);
+        }

Review comment:
       else  `checkArgument(splitOffset == 0)` + `checkArgument(splitLength == fileStatus.getLen())`  ?

##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.avro;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.RecordFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** */
+public class AvroParquetRecordFormat implements RecordFormat<GenericRecord> {

Review comment:
       I agree that this is confusing for folks that have used the `AvroParquetWriter`. `ParquetAvroWriters` is, however, easier to find for Flink users without prior knowledge. So I'm a bit torn here. Let's include @StephanEwen in the discussion.
   
   If we go with consistency with Parquet, then we should deprecate the existing `ParquetAvroWriters`, move the code to `AvroParquetWriters`, and use deprecated forward functions from old to new in a separate commit.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/RecordFormat.java
##########
@@ -0,0 +1,130 @@
+/*

Review comment:
       Please change commit message to something like
   ```
   [FLINK-21406][file connector] Added RecordFormat
   
   1. Create RecordFormat interface which focuses on Path with default methods implementation and delegate createReader() calls to the overloaded methods from StreamFormat that focuses on FDDataInputStream.
   2. Create AvroParquetRecordFormat implementation. Only reading avro GenericRecord from parquet file or stream is supported in this version.
   3. Splitting is not supported in this version.
   ```
   The list of components is not that clear (you can look at JIRA or usually a git blame helps) but it should give other Flink devs a rough idea on which module was changed. We also should always provide a crisp first line description as that's the only thing that will be shown by default in many tools.
   
   We could expand the description even to `Added RecordFormat as an opinionated StreamRecord` or so.

##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.avro;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.RecordFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** */
+public class AvroParquetRecordFormat implements RecordFormat<GenericRecord> {
+
+    private final transient Schema schema;
+
+    public AvroParquetRecordFormat(Schema schema) {
+        this.schema = schema;
+    }
+
+    /**
+     * Creates a new reader to read avro {@link GenericRecord} from Parquet input stream.
+     *
+     * <p>Several wrapper classes haven be created to Flink abstraction become compatible with the
+     * parquet abstraction. Please refer to the inner classes {@link GenericRecordReader}, {@link
+     * ParquetInputFile}, {@link FSDataInputStreamAdapter} for details.
+     */
+    @Override
+    public Reader<GenericRecord> createReader(
+            Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)
+            throws IOException {
+
+        // current version does not support splitting.
+        checkNotSplit(fileLen, splitEnd);
+
+        return new GenericRecordReader(
+                AvroParquetReader.<GenericRecord>builder(new ParquetInputFile(stream, fileLen))
+                        .withDataModel(GenericData.get())
+                        .build());
+    }
+
+    /**
+     * Restores the reader from a checkpointed position. Since current version does not support
+     * splitting,
+     */
+    @Override
+    public Reader<GenericRecord> restoreReader(
+            Configuration config,
+            FSDataInputStream stream,
+            long restoredOffset,
+            long fileLen,
+            long splitEnd)
+            throws IOException {
+
+        // current version does not support splitting.
+        checkNotSplit(fileLen, splitEnd);
+
+        // current version just ignore the splitOffset and use restoredOffset
+        stream.seek(restoredOffset);
+
+        return createReader(config, stream, fileLen, splitEnd);
+    }
+
+    /** Current version does not support splitting. */
+    @Override
+    public boolean isSplittable() {
+        return false;
+    }
+
+    /**
+     * Gets the type produced by this format. This type will be the type produced by the file source
+     * as a whole.
+     */
+    @Override
+    public TypeInformation<GenericRecord> getProducedType() {
+        return new GenericRecordAvroTypeInfo(schema);
+    }
+
+    private static void checkNotSplit(long fileLen, long splitEnd) {
+        if (splitEnd != fileLen) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Current version of AvroParquetRecordFormat is not splittable, "
+                                    + "but found split end (%d) different from file length (%d)",
+                            splitEnd, fileLen));
+        }
+    }
+
+    /**
+     * {@link RecordFormat.Reader} implementation. Using {@link ParquetReader} internally to read
+     * avro {@link GenericRecord} from parquet {@link InputFile}.
+     */
+    private static class GenericRecordReader implements RecordFormat.Reader<GenericRecord> {

Review comment:
       How would a `SpecificRecordReader` look different? I'm suspecting that we can probably use the same class for all flavors.

##########
File path: flink-formats/flink-parquet/pom.xml
##########
@@ -45,6 +45,14 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<!-- Flink-avro -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>

Review comment:
       Yes, please make it `optional`. For non-avro parquet use cases, the user will receive smaller user jars.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/RecordFormat.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       The commit is mixing concerns as also seen in your commit message: `RecordFormat` does not depend on `AvroParquetRecordFormat`, so I'd split the two things:
   1. First a commit with `RecordFormat` that introduces and motivates it in the commit messages. It also needs test coverage.
   2. `AvroParquetRecordFormat` as is with tests.

##########
File path: flink-formats/flink-avro/pom.xml
##########
@@ -26,7 +26,7 @@ under the License.
 		<groupId>org.apache.flink</groupId>
 		<artifactId>flink-formats</artifactId>
 		<version>1.15-SNAPSHOT</version>
-		<relativePath>..</relativePath>

Review comment:
       Please don't change the project setup stuff just for one module. We usually value consistency over correctness. If there a general problem, pull it into a separate commit/PR and solve it for all modules.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/RecordFormat.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A reader format that reads individual records from a file via {@link Path}.
+ *
+ * <p>This interface teams up with its superinterface together build a 2-levels API. {@link
+ * StreamFormat} focuses on abstract input stream and {@link RecordFormat} pays attention to the
+ * concrete FileSystem. This format is for cases where the readers need access to the file directly
+ * or need to create a custom stream. For readers that can directly work on input streams, consider
+ * using the superinterface {@link StreamFormat}.
+ *
+ * <p>Please refer the javadoc of {@link StreamFormat} for details.
+ *
+ * @param <T> - The type of records created by this format reader.
+ */
+@PublicEvolving
+public interface RecordFormat<T> extends StreamFormat<T> {
+
+    /**
+     * Creates a new reader to read in this format. This method is called when a fresh reader is
+     * created for a split that was assigned from the enumerator. This method may also be called on
+     * recovery from a checkpoint, if the reader never stored an offset in the checkpoint (see
+     * {@link #restoreReader(Configuration, Path, long, long, long)} for details.
+     *
+     * <p>Provide the default implementation, subclasses are therefore not forced to implement it.
+     * Compare to the {@link #createReader(Configuration, FSDataInputStream, long, long)}, This
+     * method put the focus on the {@link Path}. The default implementation adapts information given
+     * by method arguments to {@link FSDataInputStream} and calls {@link
+     * #createReader(Configuration, FSDataInputStream, long, long)}.
+     *
+     * <p>If the format is {@link #isSplittable() splittable}, then the {@code inputStream} is
+     * positioned to the beginning of the file split, otherwise it will be at position zero.
+     */
+    default StreamFormat.Reader<T> createReader(

Review comment:
       Where is this method called? It feels like this is mostly for testing. I think we should just add these methods directly to `StreamFormat<T>` instead of creating a 4. interface. Or if it's for testing only, we could add a wrapper class around it.

##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.avro;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.RecordFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** */
+public class AvroParquetRecordFormat implements RecordFormat<GenericRecord> {
+
+    private final transient Schema schema;

Review comment:
       Without a serializable Schema, this field will be `null` on all TMs. You can try to use `SerializableAvroSchema` or we go for Avro 1.10.X where Schema is serializable. The writer part uses a string representation of the schema to circumvent the issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org