You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/16 14:40:17 UTC

[flink] branch master updated: [FLINK-26604][doc] add more information for Avro records support and clean up redundant content of bounded and unbounded data.

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 579e554  [FLINK-26604][doc] add more information for Avro records support and clean up redundant content of bounded and unbounded data.
579e554 is described below

commit 579e554393e858bd7c2faf55aa4531f853c4ebe5
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Mon Mar 14 14:58:37 2022 +0100

    [FLINK-26604][doc] add more information for Avro records support and clean up redundant content of bounded and unbounded data.
    
    - mvn dependency
    - using namespace in schema for reflect records
    
    [FLINK-26604][doc] bug fix
---
 .../docs/connectors/datastream/formats/parquet.md  | 240 +++++++++++---------
 .../docs/connectors/datastream/formats/parquet.md  | 242 ++++++++++++---------
 2 files changed, 287 insertions(+), 195 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/formats/parquet.md b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
index 46828e5..87c1a1b 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
@@ -30,7 +30,7 @@ under the License.
 
 Flink supports reading [Parquet](https://parquet.apache.org/) files,
 producing {{< javadoc file="org/apache/flink/table/data/RowData.html" name="Flink RowData">}} and producing [Avro](https://avro.apache.org/) records.
-To use the format you need to add the Flink Parquet dependency to your project:
+To use the format you need to add the `flink-parquet` dependency to your project:
 
 ```xml
 <dependency>
@@ -40,45 +40,77 @@ To use the format you need to add the Flink Parquet dependency to your project:
 </dependency>
 ```
 
-This format is compatible with the new Source that can be used in both batch and streaming modes.
+To read Avro records, you will need to add the `parquet-avro` dependency:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>1.12.2</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
+This format is compatible with the new Source that can be used in both batch and streaming execution modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear
 
-## Flink RowData
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-#### Bounded data example
+{{< hint info >}}
+When you start a File Source it is configured for bounded data by default.
+To configure the File Source for unbounded data, you must additionally call
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`.
+{{< /hint >}}
 
-In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99").  
-Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC.
-The second boolean instructs the application that the projected Parquet fields names are case-sensitive.
-There is no watermark strategy defined as records do not contain event timestamps.
+**Vectorized reader**
 
 ```java
-final LogicalType[] fieldTypes =
-  new LogicalType[] {
-  new DoubleType(), new IntType(), new VarCharType()
-  };
 
-final ParquetColumnarRowInputFormat<FileSourceSplit> format =
-  new ParquetColumnarRowInputFormat<>(
-  new Configuration(),
-  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
-  500,
-  false,
-  true);
-final FileSource<RowData> source =
-  FileSource.forBulkFileFormat(format,  /* Flink Path */)
-  .build();
-final DataStream<RowData> stream =
-  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+// Parquet rows are decoded in batches
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+
+// Monitor the Paths to read data as unbounded data
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
+```
+
+**Avro Parquet reader**
+
+```java
+
+// Parquet rows are decoded in batches
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+
+// Monitor the Paths to read data as unbounded data
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
+
 ```
 
-#### Unbounded data example
+{{< hint info >}}
+Following examples are all configured for bounded data.
+To configure the File Source for unbounded data, you must additionally call
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`.
+{{< /hint >}}
 
-In this example, you will create a DataStream containing Parquet records as Flink RowDatas that will
-infinitely grow as new files are added to the directory. It will monitor for new files each second.
-The schema is projected to read only the specified fields ("f7", "f4" and "f99").  
+## Flink RowData
+
+In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99").  
 Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC.
 The second boolean instructs the application that the projected Parquet fields names are case-sensitive.
 There is no watermark strategy defined as records do not contain event timestamps.
@@ -98,7 +130,6 @@ final ParquetColumnarRowInputFormat<FileSourceSplit> format =
   true);
 final FileSource<RowData> source =
   FileSource.forBulkFileFormat(format,  /* Flink Path */)
-  .monitorContinuously(Duration.ofSeconds(1L))
   .build();
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
@@ -131,9 +162,7 @@ This example uses an Avro schema example similar to the one described in the [of
 
 This schema defines a record representing a user with three fields: name, favoriteNumber, and favoriteColor. You can find more details at [record specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for how to define an Avro schema.
 
-#### Bounded data example
-
-In this example, you will create a DataStream containing Parquet records as Avro Generic records.
+In the following example, you will create a DataStream containing Parquet records as Avro Generic records.
 It will parse the Avro schema based on the JSON string. There are many other ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer to [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) for details.
 After that, you will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Generic records.
 
@@ -163,37 +192,6 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-#### Unbounded data example
-
-This example is similar to the bounded batch example. The application monitors for new files every second
-and reads Avro Generic records from Parquet files infinitely as new files are added to the directory.
-```java
-// parsing avro schema
-final Schema schema =
-        new Schema.Parser()
-            .parse(
-                    "{\"type\": \"record\", "
-                        + "\"name\": \"User\", "
-                        + "\"fields\": [\n"
-                        + "        {\"name\": \"name\", \"type\": \"string\" },\n"
-                        + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
-                        + "        {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
-                        + "    ]\n"
-                        + "    }");
-
-final FileSource<GenericRecord> source =
-        FileSource.forRecordStreamFormat(
-                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
-        .monitorContinuously(Duration.ofSeconds(1L))
-        .build();
-
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.enableCheckpointing(10L);
-        
-final DataStream<GenericRecord> stream =
-        env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
-```
-
 ### Specific record
 
 Based on the previously defined schema, you can generate classes by leveraging Avro code generation.
@@ -202,9 +200,7 @@ You can either use `avro-tools.jar` to generate code manually or you could use t
 code generation on any .avsc files present in the configured source directory. Please refer to
 [Avro Getting Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more information.
 
-#### Bounded data example
-
-This example uses the example schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+The following example uses the example schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
 
 ```json lines
 [
@@ -247,26 +243,6 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-#### Unbounded data example
-
-This example, similar to the bounded batch example, uses the same generated Address Java class
-and monitors for the new files every second to read Avro Specific records from Parquet files
-infinitely as new files are added to the directory.
-
-```java
-final FileSource<GenericRecord> source =
-        FileSource.forRecordStreamFormat(
-                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink Path */)
-        .monitorContinuously(Duration.ofSeconds(1L))
-        .build();
-
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.enableCheckpointing(10L);
-        
-final DataStream<GenericRecord> stream =
-        env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
-```
-
 ### Reflect record
 
 Beyond Avro Generic and Specific record that requires a predefined Avro schema,
@@ -274,8 +250,6 @@ Flink also supports creating a DataStream from Parquet files based on existing J
 In this case, Avro will use Java reflection to generate schemas and protocols for these POJO classes.
 Java types are mapped to Avro schemas, please refer to the [Avro reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) documentation for more details.
 
-#### Bounded data example
-
 This example uses a simple Java POJO class [Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
 
 ```java
@@ -329,22 +303,94 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-#### Unbounded data example
+#### Prerequisite for Parquet files
+
+In order to support reading Avro reflect records, the Parquet file must contain specific meta information.
+The Avro schema used for creating the Parquet data must contain a `namespace`,
+which will be used by the program to identify the concrete Java class for the reflection process.
+
+The following example shows the `User` schema used previously. But this time it contains a namespace
+pointing to the location(in this case the package), where the `User` class for the reflection could be found.
+
+```java
+// avro schema with namespace
+final String schema = 
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"namespace\": \"org.apache.flink.formats.parquet.avro\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" },\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }";
+
+```
+
+Parquet files created with this schema will contain meta information like:
 
-This example, similar to the bounded batch example, uses the same POJO Java class `Datum`
-and monitors for the new files every second to read Avro Reflect records from Parquet files
-infinitely as new files are added to the directory.
+```text
+creator:        parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
+extra:          parquet.avro.schema =
+{"type":"record","name":"User","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
+extra:          writer.model.name = avro
+
+file schema:    org.apache.flink.formats.parquet.avro.User
+--------------------------------------------------------------------------------
+name:           REQUIRED BINARY L:STRING R:0 D:0
+favoriteNumber: OPTIONAL INT32 R:0 D:1
+favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1
+
+row group 1:    RC:3 TS:143 OFFSET:4
+--------------------------------------------------------------------------------
+name:            BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
+favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
+favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
+
+```
+
+With the `User` class defined in the package org.apache.flink.formats.parquet.avro:
+
+```java
+public class User {
+        private String name;
+        private Integer favoriteNumber;
+        private String favoriteColor;
+
+        public User() {}
+
+        public User(String name, Integer favoriteNumber, String favoriteColor) {
+            this.name = name;
+            this.favoriteNumber = favoriteNumber;
+            this.favoriteColor = favoriteColor;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public Integer getFavoriteNumber() {
+            return favoriteNumber;
+        }
+
+        public String getFavoriteColor() {
+            return favoriteColor;
+        }
+    }
+
+```
+
+you can write the following program to read Avro Reflect records of User type from parquet files:
 
 ```java
 final FileSource<GenericRecord> source =
         FileSource.forRecordStreamFormat(
-                AvroParquetReaders.forReflectRecord(Datum.class), /* Flink Path */)
-        .monitorContinuously(Duration.ofSeconds(1L))
+        AvroParquetReaders.forReflectRecord(User.class), /* Flink Path */)
         .build();
 
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(10L);
-        
+
 final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md b/docs/content/docs/connectors/datastream/formats/parquet.md
index 71ed416..8a6f9f9 100644
--- a/docs/content/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content/docs/connectors/datastream/formats/parquet.md
@@ -30,7 +30,7 @@ under the License.
 
 Flink supports reading [Parquet](https://parquet.apache.org/) files, 
 producing {{< javadoc file="org/apache/flink/table/data/RowData.html" name="Flink RowData">}} and producing [Avro](https://avro.apache.org/) records.
-To use the format you need to add the Flink Parquet dependency to your project:
+To use the format you need to add the `flink-parquet` dependency to your project:
 
 ```xml
 <dependency>
@@ -39,46 +39,78 @@ To use the format you need to add the Flink Parquet dependency to your project:
 	<version>{{< version >}}</version>
 </dependency>
 ```
- 
-This format is compatible with the new Source that can be used in both batch and streaming modes.
+
+To read Avro records, you will need to add the `parquet-avro` dependency:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>1.12.2</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
+This format is compatible with the new Source that can be used in both batch and streaming execution modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear 
 
-## Flink RowData
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-#### Bounded data example
+{{< hint info >}}
+When you start a File Source it is configured for bounded data by default.
+To configure the File Source for unbounded data, you must additionally call
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`.
+{{< /hint >}}
 
-In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99").  
-Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC.
-The second boolean instructs the application that the projected Parquet fields names are case-sensitive.
-There is no watermark strategy defined as records do not contain event timestamps.
+**Vectorized reader**
 
 ```java
-final LogicalType[] fieldTypes =
-  new LogicalType[] {
-  new DoubleType(), new IntType(), new VarCharType()
-  };
 
-final ParquetColumnarRowInputFormat<FileSourceSplit> format =
-  new ParquetColumnarRowInputFormat<>(
-  new Configuration(),
-  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
-  500,
-  false,
-  true);
-final FileSource<RowData> source =
-  FileSource.forBulkFileFormat(format,  /* Flink Path */)
-  .build();
-final DataStream<RowData> stream =
-  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+// Parquet rows are decoded in batches
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+
+// Monitor the Paths to read data as unbounded data
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
 ```
 
-#### Unbounded data example
+**Avro Parquet reader**
 
-In this example, you will create a DataStream containing Parquet records as Flink RowDatas that will
-infinitely grow as new files are added to the directory. It will monitor for new files each second.
-The schema is projected to read only the specified fields ("f7", "f4" and "f99").  
+```java
+
+// Parquet rows are decoded in batches
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+
+// Monitor the Paths to read data as unbounded data
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        .monitorContinuously(Duration.ofMillis(5L))
+        .build();
+
+
+```
+
+{{< hint info >}}
+Following examples are all configured for bounded data.
+To configure the File Source for unbounded data, you must additionally call
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`.
+{{< /hint >}}
+
+## Flink RowData
+
+In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99").  
 Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC.
 The second boolean instructs the application that the projected Parquet fields names are case-sensitive.
 There is no watermark strategy defined as records do not contain event timestamps.
@@ -98,7 +130,6 @@ final ParquetColumnarRowInputFormat<FileSourceSplit> format =
   true);
 final FileSource<RowData> source =
   FileSource.forBulkFileFormat(format,  /* Flink Path */)
-  .monitorContinuously(Duration.ofSeconds(1L))
   .build();
 final DataStream<RowData> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
@@ -131,9 +162,7 @@ This example uses an Avro schema example similar to the one described in the [of
 
 This schema defines a record representing a user with three fields: name, favoriteNumber, and favoriteColor. You can find more details at [record specification](https://avro.apache.org/docs/1.10.0/spec.html#schema_record) for how to define an Avro schema.
 
-#### Bounded data example
-
-In this example, you will create a DataStream containing Parquet records as Avro Generic records. 
+In the following example, you will create a DataStream containing Parquet records as Avro Generic records. 
 It will parse the Avro schema based on the JSON string. There are many other ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer to [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) for details.
 After that, you will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Generic records.
 
@@ -163,37 +192,6 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-#### Unbounded data example
-
-This example is similar to the bounded batch example. The application monitors for new files every second 
-and reads Avro Generic records from Parquet files infinitely as new files are added to the directory.
-```java
-// parsing avro schema
-final Schema schema =
-        new Schema.Parser()
-            .parse(
-                    "{\"type\": \"record\", "
-                        + "\"name\": \"User\", "
-                        + "\"fields\": [\n"
-                        + "        {\"name\": \"name\", \"type\": \"string\" },\n"
-                        + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
-                        + "        {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
-                        + "    ]\n"
-                        + "    }");
-
-final FileSource<GenericRecord> source =
-        FileSource.forRecordStreamFormat(
-                AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
-        .monitorContinuously(Duration.ofSeconds(1L))
-        .build();
-
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.enableCheckpointing(10L);
-        
-final DataStream<GenericRecord> stream =
-        env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
-```
-
 ### Specific record
 
 Based on the previously defined schema, you can generate classes by leveraging Avro code generation. 
@@ -202,9 +200,7 @@ You can either use `avro-tools.jar` to generate code manually or you could use t
 code generation on any .avsc files present in the configured source directory. Please refer to 
 [Avro Getting Started](https://avro.apache.org/docs/1.10.0/gettingstartedjava.html) for more information.
 
-#### Bounded data example
-
-This example uses the example schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
+The following example uses the example schema [testdata.avsc](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/resources/avro/testdata.avsc):
 
 ```json lines
 [
@@ -247,26 +243,6 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-#### Unbounded data example
-
-This example, similar to the bounded batch example, uses the same generated Address Java class 
-and monitors for the new files every second to read Avro Specific records from Parquet files 
-infinitely as new files are added to the directory.
-
-```java
-final FileSource<GenericRecord> source =
-        FileSource.forRecordStreamFormat(
-                AvroParquetReaders.forSpecificRecord(Address.class), /* Flink Path */)
-        .monitorContinuously(Duration.ofSeconds(1L))
-        .build();
-
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.enableCheckpointing(10L);
-        
-final DataStream<GenericRecord> stream =
-        env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
-```
-
 ### Reflect record
 
 Beyond Avro Generic and Specific record that requires a predefined Avro schema, 
@@ -274,8 +250,6 @@ Flink also supports creating a DataStream from Parquet files based on existing J
 In this case, Avro will use Java reflection to generate schemas and protocols for these POJO classes.
 Java types are mapped to Avro schemas, please refer to the [Avro reflect](https://avro.apache.org/docs/1.10.0/api/java/index.html) documentation for more details.
 
-#### Bounded data example
-
 This example uses a simple Java POJO class [Datum](https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/Datum.java):
 
 ```java
@@ -329,22 +303,94 @@ final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```
 
-#### Unbounded data example
+#### Prerequisite for Parquet files
+
+In order to support reading Avro reflect records, the Parquet file must contain specific meta information.
+The Avro schema used for creating the Parquet data must contain a `namespace`, 
+which will be used by the program to identify the concrete Java class for the reflection process.
+
+The following example shows the `User` schema used previously. But this time it contains a namespace 
+pointing to the location(in this case the package), where the `User` class for the reflection could be found.
+
+```java
+// avro schema with namespace
+final String schema = 
+                    "{\"type\": \"record\", "
+                        + "\"name\": \"User\", "
+                        + "\"namespace\": \"org.apache.flink.formats.parquet.avro\", "
+                        + "\"fields\": [\n"
+                        + "        {\"name\": \"name\", \"type\": \"string\" },\n"
+                        + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
+                        + "        {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
+                        + "    ]\n"
+                        + "    }";
+
+```
+
+Parquet files created with this schema will contain meta information like:
 
-This example, similar to the bounded batch example, uses the same POJO Java class `Datum`
-and monitors for the new files every second to read Avro Reflect records from Parquet files
-infinitely as new files are added to the directory.
+```text
+creator:        parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
+extra:          parquet.avro.schema =
+{"type":"record","name":"User","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
+extra:          writer.model.name = avro
+
+file schema:    org.apache.flink.formats.parquet.avro.User
+--------------------------------------------------------------------------------
+name:           REQUIRED BINARY L:STRING R:0 D:0
+favoriteNumber: OPTIONAL INT32 R:0 D:1
+favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1
+
+row group 1:    RC:3 TS:143 OFFSET:4
+--------------------------------------------------------------------------------
+name:            BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
+favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
+favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
+
+```
+
+With the `User` class defined in the package org.apache.flink.formats.parquet.avro:
+
+```java
+public class User {
+        private String name;
+        private Integer favoriteNumber;
+        private String favoriteColor;
+
+        public User() {}
+
+        public User(String name, Integer favoriteNumber, String favoriteColor) {
+            this.name = name;
+            this.favoriteNumber = favoriteNumber;
+            this.favoriteColor = favoriteColor;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public Integer getFavoriteNumber() {
+            return favoriteNumber;
+        }
+
+        public String getFavoriteColor() {
+            return favoriteColor;
+        }
+    }
+
+```
+
+you can write the following program to read Avro Reflect records of User type from parquet files:
 
 ```java
 final FileSource<GenericRecord> source =
         FileSource.forRecordStreamFormat(
-                AvroParquetReaders.forReflectRecord(Datum.class), /* Flink Path */)
-        .monitorContinuously(Duration.ofSeconds(1L))
+        AvroParquetReaders.forReflectRecord(User.class), /* Flink Path */)
         .build();
 
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(10L);
-        
+
 final DataStream<GenericRecord> stream =
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
 ```