You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/12/06 18:51:12 UTC

[flink] branch release-1.14 updated (ef0e17a -> a87ca1e)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ef0e17a  [FLINK-24777][docs] Correct description of 'Processed (persisted) in-flight data'
     new 3b9aaca  [FLINK-24859][doc][formats] Make new formats name coherent
     new 47d496a  [FLINK-24859][doc][formats] document text file reading
     new a87ca1e  [FLINK-24859][doc][formats] document parquet file reading

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/connectors/datastream/hybridsource.md     |  2 +-
 .../docs/connectors/datastream/formats/parquet.md  | 98 ++++++++++++++++++++++
 .../connectors/datastream/formats/text_files.md    | 68 +++++++++++++++
 .../docs/connectors/datastream/hybridsource.md     |  2 +-
 .../connector/base/source/hybrid/HybridSource.java |  2 +-
 ...extLineFormat.java => TextLineInputFormat.java} |  8 +-
 .../file/src/FileSourceTextLinesITCase.java        |  8 +-
 .../file/src/impl/FileSourceReaderTest.java        |  4 +-
 .../flink/connectors/hive/HiveSourceBuilder.java   |  4 +-
 .../hive/read/HiveCompactReaderFactory.java        |  4 +-
 ...BulkFormatAdapter.java => HiveInputFormat.java} | 12 +--
 .../nohive/OrcNoHiveColumnarRowInputFormat.java    | 10 +--
 ...tFormat.java => OrcColumnarRowInputFormat.java} | 10 +--
 .../org/apache/flink/orc/OrcFileFormatFactory.java |  2 +-
 ...est.java => OrcColumnarRowInputFormatTest.java} | 34 ++++----
 flink-python/pyflink/datastream/connectors.py      |  2 +-
 .../table/filesystem/LimitableBulkFormatTest.java  |  8 +-
 17 files changed, 224 insertions(+), 54 deletions(-)
 create mode 100644 docs/content/docs/connectors/datastream/formats/parquet.md
 create mode 100644 docs/content/docs/connectors/datastream/formats/text_files.md
 rename flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/{TextLineFormat.java => TextLineInputFormat.java} (93%)
 rename flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/{HiveBulkFormatAdapter.java => HiveInputFormat.java} (97%)
 rename flink-formats/flink-orc/src/main/java/org/apache/flink/orc/{OrcColumnarRowFileInputFormat.java => OrcColumnarRowInputFormat.java} (95%)
 rename flink-formats/flink-orc/src/test/java/org/apache/flink/orc/{OrcColumnarRowFileInputFormatTest.java => OrcColumnarRowInputFormatTest.java} (93%)

[flink] 03/03: [FLINK-24859][doc][formats] document parquet file reading

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a87ca1e04889a5c655d58fea1df54f9925c10409
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Nov 22 12:19:48 2021 +0100

    [FLINK-24859][doc][formats] document parquet file reading
---
 .../docs/connectors/datastream/formats/parquet.md  | 98 ++++++++++++++++++++++
 1 file changed, 98 insertions(+)

diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md b/docs/content/docs/connectors/datastream/formats/parquet.md
new file mode 100644
index 0000000..f58eeeb
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/parquet.md
@@ -0,0 +1,98 @@
+---
+title:  "Parquet files"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/parquet.html
+- /apis/streaming/connectors/formats/parquet.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Parquet format
+
+Flink supports reading [parquet](https://parquet.apache.org/) files and producing [Flink rows](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html).
+To use the format you need to add the Flink Parquet dependency to your project:
+
+```xml
+{{< artifact flink-parquet >}}
+```  
+ 
+This format is compatible with the new Source that can be used in both batch and streaming modes.
+Thus, you can use this format in two ways:
+- Bounded read for batch mode
+- Continuous read for streaming mode: monitors a directory for new files that appear 
+
+**Bounded read example**:
+
+In this example we create a DataStream containing Parquet records as Flink Rows. We project the schema to read only certain fields ("f7", "f4" and "f99").  
+We read records in batches of 500 records. The first boolean parameter specifies if timestamp columns need to be interpreted as UTC. 
+The second boolean instructs the application if the projected Parquet fields names are to be interpreted in a case sensitive way.
+There is no need for a watermark strategy as records do not contain event timestamps.
+
+```java
+final LogicalType[] fieldTypes =
+  new LogicalType[] {
+  new DoubleType(), new IntType(), new VarCharType()
+  };
+
+final ParquetColumnarRowInputFormat<FileSourceSplit> format =
+  new ParquetColumnarRowInputFormat<>(
+  new Configuration(),
+  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
+  500,
+  false,
+  true);
+final FileSource<RowData> source =
+  FileSource.forBulkFileFormat(format,  /* Flink Path */)
+  .build();
+final DataStream<RowData> stream =
+  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+```
+
+**Continuous read example**:
+
+In this example we create a DataStream containing parquet records as Flink Rows that will 
+infinitely grow as new files are added to the directory. We monitor for new files each second.
+We project the schema to read only certain fields ("f7", "f4" and "f99").  
+We read records in batches of 500 records. The first boolean parameter specifies if timestamp columns need to be interpreted as UTC.
+The second boolean instructs the application if the projected Parquet fields names are to be interpreted in a case sensitive way.
+There is no need for a watermark strategy as records do not contain event timestamps.
+
+```java
+final LogicalType[] fieldTypes =
+  new LogicalType[] {
+  new DoubleType(), new IntType(), new VarCharType()
+  };
+
+final ParquetColumnarRowInputFormat<FileSourceSplit> format =
+  new ParquetColumnarRowInputFormat<>(
+  new Configuration(),
+  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
+  500,
+  false,
+  true);
+final FileSource<RowData> source =
+  FileSource.forBulkFileFormat(format,  /* Flink Path */)
+  .monitorContinuously(Duration.ofSeconds(1L))
+  .build();
+final DataStream<RowData> stream =
+  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+```

[flink] 01/03: [FLINK-24859][doc][formats] Make new formats name coherent

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b9aaca3c96a79c241568f4493f6b308be1eb4ce
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 10 12:19:38 2021 +0100

    [FLINK-24859][doc][formats] Make new formats name coherent
---
 .../docs/connectors/datastream/hybridsource.md     |  2 +-
 .../docs/connectors/datastream/hybridsource.md     |  2 +-
 .../connector/base/source/hybrid/HybridSource.java |  2 +-
 ...extLineFormat.java => TextLineInputFormat.java} |  8 ++---
 .../file/src/FileSourceTextLinesITCase.java        |  8 +++--
 .../file/src/impl/FileSourceReaderTest.java        |  4 +--
 .../flink/connectors/hive/HiveSourceBuilder.java   |  4 +--
 .../hive/read/HiveCompactReaderFactory.java        |  4 +--
 ...BulkFormatAdapter.java => HiveInputFormat.java} | 12 ++++----
 .../nohive/OrcNoHiveColumnarRowInputFormat.java    | 10 +++----
 ...tFormat.java => OrcColumnarRowInputFormat.java} | 10 +++----
 .../org/apache/flink/orc/OrcFileFormatFactory.java |  2 +-
 ...est.java => OrcColumnarRowInputFormatTest.java} | 34 +++++++++++-----------
 flink-python/pyflink/datastream/connectors.py      |  2 +-
 .../table/filesystem/LimitableBulkFormatTest.java  |  8 +++--
 15 files changed, 58 insertions(+), 54 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/hybridsource.md b/docs/content.zh/docs/connectors/datastream/hybridsource.md
index 02f5077..7058ac4 100644
--- a/docs/content.zh/docs/connectors/datastream/hybridsource.md
+++ b/docs/content.zh/docs/connectors/datastream/hybridsource.md
@@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca
 ```java
 long switchTimestamp = ...; // derive from file input paths
 FileSource<String> fileSource =
-  FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
+  FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
 KafkaSource<String> kafkaSource =
           KafkaSource.<String>builder()
                   .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
diff --git a/docs/content/docs/connectors/datastream/hybridsource.md b/docs/content/docs/connectors/datastream/hybridsource.md
index 02f5077..7058ac4 100644
--- a/docs/content/docs/connectors/datastream/hybridsource.md
+++ b/docs/content/docs/connectors/datastream/hybridsource.md
@@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca
 ```java
 long switchTimestamp = ...; // derive from file input paths
 FileSource<String> fileSource =
-  FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
+  FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
 KafkaSource<String> kafkaSource =
           KafkaSource.<String>builder()
                   .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
index 24acb6a..8df875b 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -41,7 +41,7 @@ import java.util.List;
  *
  * <pre>{@code
  * FileSource<String> fileSource =
- *   FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
+ *   FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
  * KafkaSource<String> kafkaSource =
  *           KafkaSource.<String>builder()
  *                   .setBootstrapServers("localhost:9092")
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
similarity index 93%
rename from flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
rename to flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
index c6fa3b1..00d906d 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
@@ -42,7 +42,7 @@ import java.io.InputStreamReader;
  * with their internal buffering of stream input and charset decoder state.
  */
 @PublicEvolving
-public class TextLineFormat extends SimpleStreamFormat<String> {
+public class TextLineInputFormat extends SimpleStreamFormat<String> {
 
     private static final long serialVersionUID = 1L;
 
@@ -50,11 +50,11 @@ public class TextLineFormat extends SimpleStreamFormat<String> {
 
     private final String charsetName;
 
-    public TextLineFormat() {
+    public TextLineInputFormat() {
         this(DEFAULT_CHARSET_NAME);
     }
 
-    public TextLineFormat(String charsetName) {
+    public TextLineInputFormat(String charsetName) {
         this.charsetName = charsetName;
     }
 
@@ -72,7 +72,7 @@ public class TextLineFormat extends SimpleStreamFormat<String> {
 
     // ------------------------------------------------------------------------
 
-    /** The actual reader for the {@code TextLineFormat}. */
+    /** The actual reader for the {@code TextLineInputFormat}. */
     public static final class Reader implements StreamFormat.Reader<String> {
 
         private final BufferedReader reader;
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index eea0fa7..5f2a851 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.connector.file.src;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -119,7 +119,8 @@ public class FileSourceTextLinesITCase extends TestLogger {
         writeHiddenJunkFiles(testDir);
 
         final FileSource<String> source =
-                FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
+                FileSource.forRecordStreamFormat(
+                                new TextLineInputFormat(), Path.fromLocalFile(testDir))
                         .build();
 
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -184,7 +185,8 @@ public class FileSourceTextLinesITCase extends TestLogger {
         final File testDir = TMP_FOLDER.newFolder();
 
         final FileSource<String> source =
-                FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
+                FileSource.forRecordStreamFormat(
+                                new TextLineInputFormat(), Path.fromLocalFile(testDir))
                         .monitorContinuously(Duration.ofMillis(5))
                         .build();
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
index a25a14c..d70745f 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.file.src.impl;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.core.fs.Path;
 
@@ -64,7 +64,7 @@ public class FileSourceReaderTest {
     private static FileSourceReader<String, FileSourceSplit> createReader(
             TestingReaderContext context) {
         return new FileSourceReader<>(
-                context, new StreamFormatAdapter<>(new TextLineFormat()), new Configuration());
+                context, new StreamFormatAdapter<>(new TextLineInputFormat()), new Configuration());
     }
 
     private static FileSourceSplit createTestFileSplit() throws IOException {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 7b8442a..4b42073 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -25,8 +25,8 @@ import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
 import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
 import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter;
 import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher;
+import org.apache.flink.connectors.hive.read.HiveInputFormat;
 import org.apache.flink.connectors.hive.read.HiveSourceSplit;
 import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.connectors.hive.util.HivePartitionUtils;
@@ -310,7 +310,7 @@ public class HiveSourceBuilder {
 
     private BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
         return LimitableBulkFormat.create(
-                new HiveBulkFormatAdapter(
+                new HiveInputFormat(
                         new JobConfWrapper(jobConf),
                         partitionKeys,
                         fullSchema.getFieldNames(),
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
index 012ef01..47b36d9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
@@ -88,8 +88,8 @@ public class HiveCompactReaderFactory implements CompactReader.Factory<RowData>
     @Override
     public CompactReader<RowData> create(CompactContext context) throws IOException {
         HiveSourceSplit split = createSplit(context.getPath(), context.getFileSystem());
-        HiveBulkFormatAdapter format =
-                new HiveBulkFormatAdapter(
+        HiveInputFormat format =
+                new HiveInputFormat(
                         jobConfWrapper,
                         partitionKeys,
                         fieldNames,
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
similarity index 97%
rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
index 99aad05..6ab0dc9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
@@ -27,7 +27,7 @@ import org.apache.flink.connectors.hive.JobConfWrapper;
 import org.apache.flink.connectors.hive.util.HivePartitionUtils;
 import org.apache.flink.connectors.hive.util.JobConfUtils;
 import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
-import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
+import org.apache.flink.orc.OrcColumnarRowInputFormat;
 import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
 import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
@@ -62,11 +62,11 @@ import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_S
  * BulkFormat instances, because different hive partitions may need different BulkFormat to do the
  * reading.
  */
-public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSplit> {
+public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(HiveBulkFormatAdapter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(HiveInputFormat.class);
 
     // schema evolution configs are not available in older versions of IOConstants, let's define
     // them ourselves
@@ -83,7 +83,7 @@ public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSpli
     private final boolean useMapRedReader;
     private final PartitionFieldExtractor<HiveSourceSplit> partitionFieldExtractor;
 
-    public HiveBulkFormatAdapter(
+    public HiveInputFormat(
             JobConfWrapper jobConfWrapper,
             List<String> partitionKeys,
             String[] fieldNames,
@@ -162,7 +162,7 @@ public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSpli
         }
     }
 
-    private OrcColumnarRowFileInputFormat<?, HiveSourceSplit> createOrcFormat() {
+    private OrcColumnarRowInputFormat<?, HiveSourceSplit> createOrcFormat() {
         return hiveVersion.startsWith("1.")
                 ? OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
                         jobConfWrapper.conf(),
@@ -172,7 +172,7 @@ public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSpli
                         computeSelectedFields(),
                         Collections.emptyList(),
                         DEFAULT_SIZE)
-                : OrcColumnarRowFileInputFormat.createPartitionedFormat(
+                : OrcColumnarRowInputFormat.createPartitionedFormat(
                         OrcShim.createShim(hiveVersion),
                         jobConfWrapper.conf(),
                         tableRowType(),
diff --git a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
index 47af6ad..b186faf 100644
--- a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
+++ b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
@@ -19,7 +19,7 @@
 package org.apache.flink.orc.nohive;
 
 import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
+import org.apache.flink.orc.OrcColumnarRowInputFormat;
 import org.apache.flink.orc.OrcFilters;
 import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
 import org.apache.flink.orc.vector.ColumnBatchFactory;
@@ -42,16 +42,16 @@ import static org.apache.flink.orc.OrcSplitReaderUtil.getSelectedOrcFields;
 import static org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector.createFlinkVector;
 import static org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVectorFromConstant;
 
-/** Helper class to create {@link OrcColumnarRowFileInputFormat} for no-hive. */
+/** Helper class to create {@link OrcColumnarRowInputFormat} for no-hive. */
 public class OrcNoHiveColumnarRowInputFormat {
     private OrcNoHiveColumnarRowInputFormat() {}
 
     /**
-     * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be
+     * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be
      * generated by split.
      */
     public static <SplitT extends FileSourceSplit>
-            OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
+            OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
                     Configuration hadoopConfig,
                     RowType tableType,
                     List<String> partitionKeys,
@@ -84,7 +84,7 @@ public class OrcNoHiveColumnarRowInputFormat {
                     return new VectorizedColumnBatch(vectors);
                 };
 
-        return new OrcColumnarRowFileInputFormat<>(
+        return new OrcColumnarRowInputFormat<>(
                 new OrcNoHiveShim(),
                 hadoopConfig,
                 convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys),
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
similarity index 95%
rename from flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
rename to flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
index d88c967..bb12d51 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
@@ -55,7 +55,7 @@ import static org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVec
  * fields, which can be extracted from path. Therefore, the {@link #getProducedType()} may be
  * different and types of extra fields need to be added.
  */
-public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSplit>
+public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
         extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
 
     private static final long serialVersionUID = 1L;
@@ -63,7 +63,7 @@ public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSpli
     private final ColumnBatchFactory<BatchT, SplitT> batchFactory;
     private final RowType projectedOutputType;
 
-    public OrcColumnarRowFileInputFormat(
+    public OrcColumnarRowInputFormat(
             final OrcShim<BatchT> shim,
             final Configuration hadoopConfig,
             final TypeDescription schema,
@@ -126,11 +126,11 @@ public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSpli
     }
 
     /**
-     * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be
+     * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be
      * generated by split.
      */
     public static <SplitT extends FileSourceSplit>
-            OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
+            OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
                     OrcShim<VectorizedRowBatch> shim,
                     Configuration hadoopConfig,
                     RowType tableType,
@@ -164,7 +164,7 @@ public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSpli
                     return new VectorizedColumnBatch(vectors);
                 };
 
-        return new OrcColumnarRowFileInputFormat<>(
+        return new OrcColumnarRowInputFormat<>(
                 shim,
                 hadoopConfig,
                 convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys),
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
index 7343959..853b3b8 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
@@ -129,7 +129,7 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
                                         FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
                                                 .defaultValue());
 
-                return OrcColumnarRowFileInputFormat.createPartitionedFormat(
+                return OrcColumnarRowInputFormat.createPartitionedFormat(
                         OrcShim.defaultShim(),
                         conf,
                         tableType,
diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
similarity index 93%
rename from flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
rename to flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
index 2243d5c..5094866 100644
--- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
+++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
@@ -62,8 +62,8 @@ import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionP
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-/** Test for {@link OrcColumnarRowFileInputFormat}. */
-public class OrcColumnarRowFileInputFormatTest {
+/** Test for {@link OrcColumnarRowInputFormat}. */
+public class OrcColumnarRowInputFormatTest {
 
     /** Small batch size for test more boundary conditions. */
     protected static final int BATCH_SIZE = 9;
@@ -97,7 +97,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
     @Test
     public void testReadFileInSplits() throws IOException {
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
 
         AtomicInteger cnt = new AtomicInteger(0);
@@ -124,7 +124,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
     @Test
     public void testReadFileWithSelectFields() throws IOException {
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});
 
         AtomicInteger cnt = new AtomicInteger(0);
@@ -153,7 +153,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
     @Test
     public void testReadDecimalTypeFile() throws IOException {
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(DECIMAL_FILE_TYPE, new int[] {0});
 
         AtomicInteger cnt = new AtomicInteger(0);
@@ -217,7 +217,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
         int[] projectedFields = {8, 1, 3, 0, 5, 2};
 
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createPartitionFormat(
                         tableType, new ArrayList<>(partSpec.keySet()), projectedFields);
 
@@ -257,7 +257,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
     @Test
     public void testReadFileAndRestore() throws IOException {
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
 
         // pick a middle split
@@ -276,7 +276,7 @@ public class OrcColumnarRowFileInputFormatTest {
                                 new Between("_col0", PredicateLeaf.Type.LONG, 0L, 975000L),
                                 new Equals("_col0", PredicateLeaf.Type.LONG, 980001L),
                                 new Between("_col0", PredicateLeaf.Type.LONG, 990000L, 1800000L)));
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {0, 1}, filter);
 
         // pick a middle split
@@ -290,7 +290,7 @@ public class OrcColumnarRowFileInputFormatTest {
     }
 
     private void innerTestRestore(
-            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
+            OrcColumnarRowInputFormat<?, FileSourceSplit> format,
             FileSourceSplit split,
             int breakCnt,
             int expectedCnt,
@@ -338,14 +338,14 @@ public class OrcColumnarRowFileInputFormatTest {
         assertEquals(expectedTotalF0, totalF0.get());
     }
 
-    protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat(
+    protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat(
             RowType formatType, int[] selectedFields) {
         return createFormat(formatType, selectedFields, new ArrayList<>());
     }
 
-    protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat(
+    protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat(
             RowType formatType, int[] selectedFields, List<Predicate> conjunctPredicates) {
-        return OrcColumnarRowFileInputFormat.createPartitionedFormat(
+        return OrcColumnarRowInputFormat.createPartitionedFormat(
                 OrcShim.defaultShim(),
                 new Configuration(),
                 formatType,
@@ -356,9 +356,9 @@ public class OrcColumnarRowFileInputFormatTest {
                 BATCH_SIZE);
     }
 
-    protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createPartitionFormat(
+    protected OrcColumnarRowInputFormat<?, FileSourceSplit> createPartitionFormat(
             RowType tableType, List<String> partitionKeys, int[] selectedFields) {
-        return OrcColumnarRowFileInputFormat.createPartitionedFormat(
+        return OrcColumnarRowInputFormat.createPartitionedFormat(
                 OrcShim.defaultShim(),
                 new Configuration(),
                 tableType,
@@ -370,13 +370,13 @@ public class OrcColumnarRowFileInputFormatTest {
     }
 
     private BulkFormat.Reader<RowData> createReader(
-            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format, FileSourceSplit split)
+            OrcColumnarRowInputFormat<?, FileSourceSplit> format, FileSourceSplit split)
             throws IOException {
         return format.createReader(new org.apache.flink.configuration.Configuration(), split);
     }
 
     private BulkFormat.Reader<RowData> restoreReader(
-            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
+            OrcColumnarRowInputFormat<?, FileSourceSplit> format,
             FileSourceSplit split,
             long offset,
             long recordSkipCount)
@@ -389,7 +389,7 @@ public class OrcColumnarRowFileInputFormatTest {
     }
 
     private void forEach(
-            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
+            OrcColumnarRowInputFormat<?, FileSourceSplit> format,
             FileSourceSplit split,
             Consumer<RowData> action)
             throws IOException {
diff --git a/flink-python/pyflink/datastream/connectors.py b/flink-python/pyflink/datastream/connectors.py
index b4b3335..035cd62 100644
--- a/flink-python/pyflink/datastream/connectors.py
+++ b/flink-python/pyflink/datastream/connectors.py
@@ -757,7 +757,7 @@ class StreamFormat(object):
         :param charset_name: The charset to decode the byte stream.
         """
         j_stream_format = get_gateway().jvm.org.apache.flink.connector.file.src.reader. \
-            TextLineFormat(charset_name)
+            TextLineInputFormat(charset_name)
         return StreamFormat(j_stream_format)
 
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
index 92a1d22..471af36 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.reader.StreamFormat;
-import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.connector.file.src.util.Utils;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.FileUtils;
@@ -56,7 +56,8 @@ public class LimitableBulkFormatTest {
 
         // read
         BulkFormat<String, FileSourceSplit> format =
-                LimitableBulkFormat.create(new StreamFormatAdapter<>(new TextLineFormat()), 22L);
+                LimitableBulkFormat.create(
+                        new StreamFormatAdapter<>(new TextLineInputFormat()), 22L);
 
         BulkFormat.Reader<String> reader =
                 format.createReader(
@@ -88,7 +89,8 @@ public class LimitableBulkFormatTest {
 
         // read
         BulkFormat<String, FileSourceSplit> format =
-                LimitableBulkFormat.create(new StreamFormatAdapter<>(new TextLineFormat()), limit);
+                LimitableBulkFormat.create(
+                        new StreamFormatAdapter<>(new TextLineInputFormat()), limit);
 
         BulkFormat.Reader<String> reader =
                 format.createReader(

[flink] 02/03: [FLINK-24859][doc][formats] document text file reading

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 47d496ab85940df618fd5c192680d1d9aa720964
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Nov 15 12:02:38 2021 +0100

    [FLINK-24859][doc][formats] document text file reading
---
 .../connectors/datastream/formats/text_files.md    | 68 ++++++++++++++++++++++
 1 file changed, 68 insertions(+)

diff --git a/docs/content/docs/connectors/datastream/formats/text_files.md b/docs/content/docs/connectors/datastream/formats/text_files.md
new file mode 100644
index 0000000..b79c9c6
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/text_files.md
@@ -0,0 +1,68 @@
+---
+title:  "Text files"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/text_files.html
+- /apis/streaming/connectors/formats/text_files.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Text files format
+
+Flink supports reading from text lines from a file using `TextLineInputFormat`. This format uses Java's built-in InputStreamReader to decode the byte stream using various supported charset encodings.
+To use the format you need to add the Flink Parquet dependency to your project:
+
+```xml
+{{< artifact flink-connector-files >}}
+```
+
+This format is compatible with the new Source that can be used in both batch and streaming modes.
+Thus, you can use this format in two ways:
+- Bounded read for batch mode
+- Continuous read for streaming mode: monitors a directory for new files that appear
+
+**Bounded read example**:
+
+In this example we create a DataStream containing the lines of a text file as Strings. 
+There is no need for a watermark strategy as records do not contain event timestamps.
+
+```java
+final FileSource<String> source =
+  FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
+  .build();
+final DataStream<String> stream =
+  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+```
+
+**Continuous read example**:
+In this example, we create a DataStream containing the lines of text files as Strings that will infinitely grow 
+as new files are added to the directory. We monitor for new files each second.
+There is no need for a watermark strategy as records do not contain event timestamps.
+
+```java
+final FileSource<String> source =
+    FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
+  .monitorContinuously(Duration.ofSeconds(1L))
+  .build();
+final DataStream<String> stream =
+  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+```