You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/12/08 15:46:30 UTC

[flink] branch release-1.14 updated: Revert "[FLINK-24859][doc][formats] Make new formats name coherent"

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 94f269d  Revert "[FLINK-24859][doc][formats] Make new formats name coherent"
94f269d is described below

commit 94f269d2736ea8ad3860f8dde534f7fb075f0197
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Dec 8 10:35:54 2021 +0100

    Revert "[FLINK-24859][doc][formats] Make new formats name coherent"
    
    This reverts commit 50cd7bb2df1c384ccf5cfc5ed7808e177d62ee1c.
---
 .../docs/connectors/datastream/hybridsource.md     |  2 +-
 .../connectors/datastream/formats/text_files.md    |  6 ++--
 .../docs/connectors/datastream/hybridsource.md     |  2 +-
 .../connector/base/source/hybrid/HybridSource.java |  2 +-
 ...extLineInputFormat.java => TextLineFormat.java} |  8 ++---
 .../file/src/FileSourceTextLinesITCase.java        |  8 ++---
 .../file/src/impl/FileSourceReaderTest.java        |  4 +--
 .../flink/connectors/hive/HiveSourceBuilder.java   |  4 +--
 ...InputFormat.java => HiveBulkFormatAdapter.java} | 12 ++++----
 .../hive/read/HiveCompactReaderFactory.java        |  4 +--
 .../nohive/OrcNoHiveColumnarRowInputFormat.java    | 10 +++----
 ...mat.java => OrcColumnarRowFileInputFormat.java} | 10 +++----
 .../org/apache/flink/orc/OrcFileFormatFactory.java |  2 +-
 ...java => OrcColumnarRowFileInputFormatTest.java} | 34 +++++++++++-----------
 flink-python/pyflink/datastream/connectors.py      |  2 +-
 .../table/filesystem/LimitableBulkFormatTest.java  |  8 ++---
 16 files changed, 57 insertions(+), 61 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/hybridsource.md b/docs/content.zh/docs/connectors/datastream/hybridsource.md
index 7058ac4..02f5077 100644
--- a/docs/content.zh/docs/connectors/datastream/hybridsource.md
+++ b/docs/content.zh/docs/connectors/datastream/hybridsource.md
@@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca
 ```java
 long switchTimestamp = ...; // derive from file input paths
 FileSource<String> fileSource =
-  FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
+  FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
 KafkaSource<String> kafkaSource =
           KafkaSource.<String>builder()
                   .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
diff --git a/docs/content/docs/connectors/datastream/formats/text_files.md b/docs/content/docs/connectors/datastream/formats/text_files.md
index b79c9c6..7e9fcf6 100644
--- a/docs/content/docs/connectors/datastream/formats/text_files.md
+++ b/docs/content/docs/connectors/datastream/formats/text_files.md
@@ -28,7 +28,7 @@ under the License.
 
 # Text files format
 
-Flink supports reading from text lines from a file using `TextLineInputFormat`. This format uses Java's built-in InputStreamReader to decode the byte stream using various supported charset encodings.
+Flink supports reading from text lines from a file using `TextLineFormat`. This format uses Java's built-in InputStreamReader to decode the byte stream using various supported charset encodings.
 To use the format you need to add the Flink Parquet dependency to your project:
 
 ```xml
@@ -47,7 +47,7 @@ There is no need for a watermark strategy as records do not contain event timest
 
 ```java
 final FileSource<String> source =
-  FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
+  FileSource.forRecordStreamFormat(new TextLineFormat(), /* Flink Path */)
   .build();
 final DataStream<String> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
@@ -60,7 +60,7 @@ There is no need for a watermark strategy as records do not contain event timest
 
 ```java
 final FileSource<String> source =
-    FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
+    FileSource.forRecordStreamFormat(new TextLineFormat(), /* Flink Path */)
   .monitorContinuously(Duration.ofSeconds(1L))
   .build();
 final DataStream<String> stream =
diff --git a/docs/content/docs/connectors/datastream/hybridsource.md b/docs/content/docs/connectors/datastream/hybridsource.md
index 7058ac4..02f5077 100644
--- a/docs/content/docs/connectors/datastream/hybridsource.md
+++ b/docs/content/docs/connectors/datastream/hybridsource.md
@@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca
 ```java
 long switchTimestamp = ...; // derive from file input paths
 FileSource<String> fileSource =
-  FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
+  FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
 KafkaSource<String> kafkaSource =
           KafkaSource.<String>builder()
                   .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
index 8df875b..24acb6a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -41,7 +41,7 @@ import java.util.List;
  *
  * <pre>{@code
  * FileSource<String> fileSource =
- *   FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
+ *   FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
  * KafkaSource<String> kafkaSource =
  *           KafkaSource.<String>builder()
  *                   .setBootstrapServers("localhost:9092")
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
similarity index 93%
rename from flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
rename to flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
index 00d906d..c6fa3b1 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
@@ -42,7 +42,7 @@ import java.io.InputStreamReader;
  * with their internal buffering of stream input and charset decoder state.
  */
 @PublicEvolving
-public class TextLineInputFormat extends SimpleStreamFormat<String> {
+public class TextLineFormat extends SimpleStreamFormat<String> {
 
     private static final long serialVersionUID = 1L;
 
@@ -50,11 +50,11 @@ public class TextLineInputFormat extends SimpleStreamFormat<String> {
 
     private final String charsetName;
 
-    public TextLineInputFormat() {
+    public TextLineFormat() {
         this(DEFAULT_CHARSET_NAME);
     }
 
-    public TextLineInputFormat(String charsetName) {
+    public TextLineFormat(String charsetName) {
         this.charsetName = charsetName;
     }
 
@@ -72,7 +72,7 @@ public class TextLineInputFormat extends SimpleStreamFormat<String> {
 
     // ------------------------------------------------------------------------
 
-    /** The actual reader for the {@code TextLineInputFormat}. */
+    /** The actual reader for the {@code TextLineFormat}. */
     public static final class Reader implements StreamFormat.Reader<String> {
 
         private final BufferedReader reader;
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 5f2a851..eea0fa7 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.connector.file.src;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -119,8 +119,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
         writeHiddenJunkFiles(testDir);
 
         final FileSource<String> source =
-                FileSource.forRecordStreamFormat(
-                                new TextLineInputFormat(), Path.fromLocalFile(testDir))
+                FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
                         .build();
 
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -185,8 +184,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
         final File testDir = TMP_FOLDER.newFolder();
 
         final FileSource<String> source =
-                FileSource.forRecordStreamFormat(
-                                new TextLineInputFormat(), Path.fromLocalFile(testDir))
+                FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
                         .monitorContinuously(Duration.ofMillis(5))
                         .build();
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
index d70745f..a25a14c 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.file.src.impl;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.core.fs.Path;
 
@@ -64,7 +64,7 @@ public class FileSourceReaderTest {
     private static FileSourceReader<String, FileSourceSplit> createReader(
             TestingReaderContext context) {
         return new FileSourceReader<>(
-                context, new StreamFormatAdapter<>(new TextLineInputFormat()), new Configuration());
+                context, new StreamFormatAdapter<>(new TextLineFormat()), new Configuration());
     }
 
     private static FileSourceSplit createTestFileSplit() throws IOException {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 4b42073..7b8442a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -25,8 +25,8 @@ import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
 import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
 import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter;
 import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher;
-import org.apache.flink.connectors.hive.read.HiveInputFormat;
 import org.apache.flink.connectors.hive.read.HiveSourceSplit;
 import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.connectors.hive.util.HivePartitionUtils;
@@ -310,7 +310,7 @@ public class HiveSourceBuilder {
 
     private BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
         return LimitableBulkFormat.create(
-                new HiveInputFormat(
+                new HiveBulkFormatAdapter(
                         new JobConfWrapper(jobConf),
                         partitionKeys,
                         fullSchema.getFieldNames(),
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
similarity index 97%
rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
index 6ab0dc9..99aad05 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
@@ -27,7 +27,7 @@ import org.apache.flink.connectors.hive.JobConfWrapper;
 import org.apache.flink.connectors.hive.util.HivePartitionUtils;
 import org.apache.flink.connectors.hive.util.JobConfUtils;
 import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
-import org.apache.flink.orc.OrcColumnarRowInputFormat;
+import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
 import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
 import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
@@ -62,11 +62,11 @@ import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_S
  * BulkFormat instances, because different hive partitions may need different BulkFormat to do the
  * reading.
  */
-public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
+public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSplit> {
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(HiveInputFormat.class);
+    private static final Logger LOG = LoggerFactory.getLogger(HiveBulkFormatAdapter.class);
 
     // schema evolution configs are not available in older versions of IOConstants, let's define
     // them ourselves
@@ -83,7 +83,7 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
     private final boolean useMapRedReader;
     private final PartitionFieldExtractor<HiveSourceSplit> partitionFieldExtractor;
 
-    public HiveInputFormat(
+    public HiveBulkFormatAdapter(
             JobConfWrapper jobConfWrapper,
             List<String> partitionKeys,
             String[] fieldNames,
@@ -162,7 +162,7 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
         }
     }
 
-    private OrcColumnarRowInputFormat<?, HiveSourceSplit> createOrcFormat() {
+    private OrcColumnarRowFileInputFormat<?, HiveSourceSplit> createOrcFormat() {
         return hiveVersion.startsWith("1.")
                 ? OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
                         jobConfWrapper.conf(),
@@ -172,7 +172,7 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
                         computeSelectedFields(),
                         Collections.emptyList(),
                         DEFAULT_SIZE)
-                : OrcColumnarRowInputFormat.createPartitionedFormat(
+                : OrcColumnarRowFileInputFormat.createPartitionedFormat(
                         OrcShim.createShim(hiveVersion),
                         jobConfWrapper.conf(),
                         tableRowType(),
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
index 47b36d9..012ef01 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
@@ -88,8 +88,8 @@ public class HiveCompactReaderFactory implements CompactReader.Factory<RowData>
     @Override
     public CompactReader<RowData> create(CompactContext context) throws IOException {
         HiveSourceSplit split = createSplit(context.getPath(), context.getFileSystem());
-        HiveInputFormat format =
-                new HiveInputFormat(
+        HiveBulkFormatAdapter format =
+                new HiveBulkFormatAdapter(
                         jobConfWrapper,
                         partitionKeys,
                         fieldNames,
diff --git a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
index b186faf..47af6ad 100644
--- a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
+++ b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
@@ -19,7 +19,7 @@
 package org.apache.flink.orc.nohive;
 
 import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.orc.OrcColumnarRowInputFormat;
+import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
 import org.apache.flink.orc.OrcFilters;
 import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
 import org.apache.flink.orc.vector.ColumnBatchFactory;
@@ -42,16 +42,16 @@ import static org.apache.flink.orc.OrcSplitReaderUtil.getSelectedOrcFields;
 import static org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector.createFlinkVector;
 import static org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVectorFromConstant;
 
-/** Helper class to create {@link OrcColumnarRowInputFormat} for no-hive. */
+/** Helper class to create {@link OrcColumnarRowFileInputFormat} for no-hive. */
 public class OrcNoHiveColumnarRowInputFormat {
     private OrcNoHiveColumnarRowInputFormat() {}
 
     /**
-     * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be
+     * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be
      * generated by split.
      */
     public static <SplitT extends FileSourceSplit>
-            OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
+            OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
                     Configuration hadoopConfig,
                     RowType tableType,
                     List<String> partitionKeys,
@@ -84,7 +84,7 @@ public class OrcNoHiveColumnarRowInputFormat {
                     return new VectorizedColumnBatch(vectors);
                 };
 
-        return new OrcColumnarRowInputFormat<>(
+        return new OrcColumnarRowFileInputFormat<>(
                 new OrcNoHiveShim(),
                 hadoopConfig,
                 convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys),
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
similarity index 95%
rename from flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
rename to flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
index bb12d51..d88c967 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
@@ -55,7 +55,7 @@ import static org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVec
  * fields, which can be extracted from path. Therefore, the {@link #getProducedType()} may be
  * different and types of extra fields need to be added.
  */
-public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
+public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSplit>
         extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
 
     private static final long serialVersionUID = 1L;
@@ -63,7 +63,7 @@ public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
     private final ColumnBatchFactory<BatchT, SplitT> batchFactory;
     private final RowType projectedOutputType;
 
-    public OrcColumnarRowInputFormat(
+    public OrcColumnarRowFileInputFormat(
             final OrcShim<BatchT> shim,
             final Configuration hadoopConfig,
             final TypeDescription schema,
@@ -126,11 +126,11 @@ public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
     }
 
     /**
-     * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be
+     * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be
      * generated by split.
      */
     public static <SplitT extends FileSourceSplit>
-            OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
+            OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
                     OrcShim<VectorizedRowBatch> shim,
                     Configuration hadoopConfig,
                     RowType tableType,
@@ -164,7 +164,7 @@ public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
                     return new VectorizedColumnBatch(vectors);
                 };
 
-        return new OrcColumnarRowInputFormat<>(
+        return new OrcColumnarRowFileInputFormat<>(
                 shim,
                 hadoopConfig,
                 convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys),
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
index 853b3b8..7343959 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
@@ -129,7 +129,7 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
                                         FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
                                                 .defaultValue());
 
-                return OrcColumnarRowInputFormat.createPartitionedFormat(
+                return OrcColumnarRowFileInputFormat.createPartitionedFormat(
                         OrcShim.defaultShim(),
                         conf,
                         tableType,
diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
similarity index 93%
rename from flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
rename to flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
index 5094866..2243d5c 100644
--- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
+++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
@@ -62,8 +62,8 @@ import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionP
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-/** Test for {@link OrcColumnarRowInputFormat}. */
-public class OrcColumnarRowInputFormatTest {
+/** Test for {@link OrcColumnarRowFileInputFormat}. */
+public class OrcColumnarRowFileInputFormatTest {
 
     /** Small batch size for test more boundary conditions. */
     protected static final int BATCH_SIZE = 9;
@@ -97,7 +97,7 @@ public class OrcColumnarRowInputFormatTest {
 
     @Test
     public void testReadFileInSplits() throws IOException {
-        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
 
         AtomicInteger cnt = new AtomicInteger(0);
@@ -124,7 +124,7 @@ public class OrcColumnarRowInputFormatTest {
 
     @Test
     public void testReadFileWithSelectFields() throws IOException {
-        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});
 
         AtomicInteger cnt = new AtomicInteger(0);
@@ -153,7 +153,7 @@ public class OrcColumnarRowInputFormatTest {
 
     @Test
     public void testReadDecimalTypeFile() throws IOException {
-        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
                 createFormat(DECIMAL_FILE_TYPE, new int[] {0});
 
         AtomicInteger cnt = new AtomicInteger(0);
@@ -217,7 +217,7 @@ public class OrcColumnarRowInputFormatTest {
 
         int[] projectedFields = {8, 1, 3, 0, 5, 2};
 
-        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
                 createPartitionFormat(
                         tableType, new ArrayList<>(partSpec.keySet()), projectedFields);
 
@@ -257,7 +257,7 @@ public class OrcColumnarRowInputFormatTest {
 
     @Test
     public void testReadFileAndRestore() throws IOException {
-        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
 
         // pick a middle split
@@ -276,7 +276,7 @@ public class OrcColumnarRowInputFormatTest {
                                 new Between("_col0", PredicateLeaf.Type.LONG, 0L, 975000L),
                                 new Equals("_col0", PredicateLeaf.Type.LONG, 980001L),
                                 new Between("_col0", PredicateLeaf.Type.LONG, 990000L, 1800000L)));
-        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {0, 1}, filter);
 
         // pick a middle split
@@ -290,7 +290,7 @@ public class OrcColumnarRowInputFormatTest {
     }
 
     private void innerTestRestore(
-            OrcColumnarRowInputFormat<?, FileSourceSplit> format,
+            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
             FileSourceSplit split,
             int breakCnt,
             int expectedCnt,
@@ -338,14 +338,14 @@ public class OrcColumnarRowInputFormatTest {
         assertEquals(expectedTotalF0, totalF0.get());
     }
 
-    protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat(
+    protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat(
             RowType formatType, int[] selectedFields) {
         return createFormat(formatType, selectedFields, new ArrayList<>());
     }
 
-    protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat(
+    protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat(
             RowType formatType, int[] selectedFields, List<Predicate> conjunctPredicates) {
-        return OrcColumnarRowInputFormat.createPartitionedFormat(
+        return OrcColumnarRowFileInputFormat.createPartitionedFormat(
                 OrcShim.defaultShim(),
                 new Configuration(),
                 formatType,
@@ -356,9 +356,9 @@ public class OrcColumnarRowInputFormatTest {
                 BATCH_SIZE);
     }
 
-    protected OrcColumnarRowInputFormat<?, FileSourceSplit> createPartitionFormat(
+    protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createPartitionFormat(
             RowType tableType, List<String> partitionKeys, int[] selectedFields) {
-        return OrcColumnarRowInputFormat.createPartitionedFormat(
+        return OrcColumnarRowFileInputFormat.createPartitionedFormat(
                 OrcShim.defaultShim(),
                 new Configuration(),
                 tableType,
@@ -370,13 +370,13 @@ public class OrcColumnarRowInputFormatTest {
     }
 
     private BulkFormat.Reader<RowData> createReader(
-            OrcColumnarRowInputFormat<?, FileSourceSplit> format, FileSourceSplit split)
+            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format, FileSourceSplit split)
             throws IOException {
         return format.createReader(new org.apache.flink.configuration.Configuration(), split);
     }
 
     private BulkFormat.Reader<RowData> restoreReader(
-            OrcColumnarRowInputFormat<?, FileSourceSplit> format,
+            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
             FileSourceSplit split,
             long offset,
             long recordSkipCount)
@@ -389,7 +389,7 @@ public class OrcColumnarRowInputFormatTest {
     }
 
     private void forEach(
-            OrcColumnarRowInputFormat<?, FileSourceSplit> format,
+            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
             FileSourceSplit split,
             Consumer<RowData> action)
             throws IOException {
diff --git a/flink-python/pyflink/datastream/connectors.py b/flink-python/pyflink/datastream/connectors.py
index 035cd62..b4b3335 100644
--- a/flink-python/pyflink/datastream/connectors.py
+++ b/flink-python/pyflink/datastream/connectors.py
@@ -757,7 +757,7 @@ class StreamFormat(object):
         :param charset_name: The charset to decode the byte stream.
         """
         j_stream_format = get_gateway().jvm.org.apache.flink.connector.file.src.reader. \
-            TextLineInputFormat(charset_name)
+            TextLineFormat(charset_name)
         return StreamFormat(j_stream_format)
 
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
index 471af36..92a1d22 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.reader.StreamFormat;
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.connector.file.src.util.Utils;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.FileUtils;
@@ -56,8 +56,7 @@ public class LimitableBulkFormatTest {
 
         // read
         BulkFormat<String, FileSourceSplit> format =
-                LimitableBulkFormat.create(
-                        new StreamFormatAdapter<>(new TextLineInputFormat()), 22L);
+                LimitableBulkFormat.create(new StreamFormatAdapter<>(new TextLineFormat()), 22L);
 
         BulkFormat.Reader<String> reader =
                 format.createReader(
@@ -89,8 +88,7 @@ public class LimitableBulkFormatTest {
 
         // read
         BulkFormat<String, FileSourceSplit> format =
-                LimitableBulkFormat.create(
-                        new StreamFormatAdapter<>(new TextLineInputFormat()), limit);
+                LimitableBulkFormat.create(new StreamFormatAdapter<>(new TextLineFormat()), limit);
 
         BulkFormat.Reader<String> reader =
                 format.createReader(