You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/14 15:18:59 UTC

[GitHub] [flink] MartijnVisser commented on a change in pull request #19083: [FLINK-26604][doc] add more information for Avro records support and clean up redundant content of bounded and unbounded data

MartijnVisser commented on a change in pull request #19083:
URL: https://github.com/apache/flink/pull/19083#discussion_r826047567



##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -30,7 +30,7 @@ under the License.
 
 Flink supports reading [Parquet](https://parquet.apache.org/) files, 
 producing {{< javadoc file="org/apache/flink/table/data/RowData.html" name="Flink RowData">}} and producing [Avro](https://avro.apache.org/) records.
-To use the format you need to add the Flink Parquet dependency to your project:
+To use the format you need to add the flink-parquet dependency to your project for reading Flink RowData:

Review comment:
       ```suggestion
   To use the format you need to add the `flink-parquet` dependency to your project:
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet dependency to your project:
 	<version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>

Review comment:
       Are these exclusions only needed when using this in combination with `flink-parquet` ? Or can we already exclude these by default?

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet dependency to your project:
 	<version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 This format is compatible with the new Source that can be used in both batch and streaming modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear 
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-## Flink RowData
+By default, a File Source is created in the bounded mode, to turn the source into the continuous unbounded mode you can call 
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` additionally .
 
-#### Bounded data example
+**Batch mode**
 
-In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99").  
-Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC.
-The second boolean instructs the application that the projected Parquet fields names are case-sensitive.
-There is no watermark strategy defined as records do not contain event timestamps.
+```java
+
+// reads bounded data of records from files at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        
+// reads unbounded data of records from files by monitoring the Paths
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
+```
+
+**Streaming mode** 
 
 ```java
-final LogicalType[] fieldTypes =
-  new LogicalType[] {
-  new DoubleType(), new IntType(), new VarCharType()
-  };
 
-final ParquetColumnarRowInputFormat<FileSourceSplit> format =
-  new ParquetColumnarRowInputFormat<>(
-  new Configuration(),
-  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
-  500,
-  false,
-  true);
-final FileSource<RowData> source =
-  FileSource.forBulkFileFormat(format,  /* Flink Path */)
-  .build();
-final DataStream<RowData> stream =
-  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+// reads bounded data of records from files at a time
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads unbounded data of records from files by monitoring the Paths
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
+
 ```
 
-#### Unbounded data example
+From now on, this document will only show you examples for bounded data. You can add a call of

Review comment:
       If we only show examples for either bounded or unbounded data, I think I would prefer to have the examples show unbounded data and explain how you could apply them to bounded data. Since batch is a special case of streaming from a Flink perspective.

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -329,22 +296,94 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-#### Unbounded data example
+#### Prerequisite of Parquet files
+
+In order to support reading Avro Reflect records, the parquet file must contain specific meta information.
+The Avro schema used for creating the parquet data must contain a `namespace`, 
+which will be used by the program to identify the concrete Java class for the reflection process.
+
+The following example shows the User schema used previously. But this time it contains a namespace 
+pointing to the location(in this case the package), where the User class for the reflection could be found.
+
+```java
+// avro schema with namespace
+final String schema = 
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"namespace\": \"org.apache.flink.formats.parquet.avro\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" },\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }";
+
+```
+
+Parquet files created with this schema will contain meta information like:
 
-This example, similar to the bounded batch example, uses the same POJO Java class `Datum`
-and monitors for the new files every second to read Avro Reflect records from Parquet files
-infinitely as new files are added to the directory.
+```text
+creator:        parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
+extra:          parquet.avro.schema =
+{"type":"record","name":"User","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
+extra:          writer.model.name = avro
+
+file schema:    org.apache.flink.formats.parquet.avro.User
+--------------------------------------------------------------------------------
+name:           REQUIRED BINARY L:STRING R:0 D:0
+favoriteNumber: OPTIONAL INT32 R:0 D:1
+favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1
+
+row group 1:    RC:3 TS:143 OFFSET:4
+--------------------------------------------------------------------------------
+name:            BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
+favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
+favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
+
+```
+
+With the User class defined in the package org.apache.flink.formats.parquet.avro:

Review comment:
       ```suggestion
   With the `User` class defined in the package org.apache.flink.formats.parquet.avro:
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet dependency to your project:
 	<version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 This format is compatible with the new Source that can be used in both batch and streaming modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear 
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-## Flink RowData
+By default, a File Source is created in the bounded mode, to turn the source into the continuous unbounded mode you can call 
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` additionally .
 
-#### Bounded data example
+**Batch mode**

Review comment:
       I think Flink still uses Bounded and Unbounded data. Batch mode is referring to the execution mode

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet dependency to your project:
 	<version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:

Review comment:
       ```suggestion
   To read Avro records, you will need to add the `parquet-avro` dependency:
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,71 @@ To use the format you need to add the Flink Parquet dependency to your project:
 	<version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+For reading Avro records, parquet-avro dependency is required additionally:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 This format is compatible with the new Source that can be used in both batch and streaming modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear 
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-## Flink RowData
+By default, a File Source is created in the bounded mode, to turn the source into the continuous unbounded mode you can call 
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` additionally .
 
-#### Bounded data example
+**Batch mode**
 
-In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99").  
-Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC.
-The second boolean instructs the application that the projected Parquet fields names are case-sensitive.
-There is no watermark strategy defined as records do not contain event timestamps.
+```java
+
+// reads bounded data of records from files at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        
+// reads unbounded data of records from files by monitoring the Paths
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
+```
+
+**Streaming mode** 

Review comment:
       ```suggestion
   **Unbounded data** 
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -329,22 +296,94 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-#### Unbounded data example
+#### Prerequisite of Parquet files
+
+In order to support reading Avro Reflect records, the parquet file must contain specific meta information.
+The Avro schema used for creating the parquet data must contain a `namespace`, 
+which will be used by the program to identify the concrete Java class for the reflection process.
+
+The following example shows the User schema used previously. But this time it contains a namespace 
+pointing to the location(in this case the package), where the User class for the reflection could be found.

Review comment:
       ```suggestion
   In order to support reading Avro reflect records, the Parquet file must contain specific meta information.
   The Avro schema used for creating the Parquet data must contain a `namespace`, 
   which will be used by the program to identify the concrete Java class for the reflection process.
   
   The following example shows the `User` schema used previously. But this time it contains a namespace 
   pointing to the location(in this case the package), where the `User` class for the reflection could be found.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org