You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/12/08 15:46:30 UTC
[flink] branch release-1.14 updated: Revert "[FLINK-24859][doc][formats] Make new formats name coherent"
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 94f269d Revert "[FLINK-24859][doc][formats] Make new formats name coherent"
94f269d is described below
commit 94f269d2736ea8ad3860f8dde534f7fb075f0197
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Dec 8 10:35:54 2021 +0100
Revert "[FLINK-24859][doc][formats] Make new formats name coherent"
This reverts commit 50cd7bb2df1c384ccf5cfc5ed7808e177d62ee1c.
---
.../docs/connectors/datastream/hybridsource.md | 2 +-
.../connectors/datastream/formats/text_files.md | 6 ++--
.../docs/connectors/datastream/hybridsource.md | 2 +-
.../connector/base/source/hybrid/HybridSource.java | 2 +-
...extLineInputFormat.java => TextLineFormat.java} | 8 ++---
.../file/src/FileSourceTextLinesITCase.java | 8 ++---
.../file/src/impl/FileSourceReaderTest.java | 4 +--
.../flink/connectors/hive/HiveSourceBuilder.java | 4 +--
...InputFormat.java => HiveBulkFormatAdapter.java} | 12 ++++----
.../hive/read/HiveCompactReaderFactory.java | 4 +--
.../nohive/OrcNoHiveColumnarRowInputFormat.java | 10 +++----
...mat.java => OrcColumnarRowFileInputFormat.java} | 10 +++----
.../org/apache/flink/orc/OrcFileFormatFactory.java | 2 +-
...java => OrcColumnarRowFileInputFormatTest.java} | 34 +++++++++++-----------
flink-python/pyflink/datastream/connectors.py | 2 +-
.../table/filesystem/LimitableBulkFormatTest.java | 8 ++---
16 files changed, 57 insertions(+), 61 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/hybridsource.md b/docs/content.zh/docs/connectors/datastream/hybridsource.md
index 7058ac4..02f5077 100644
--- a/docs/content.zh/docs/connectors/datastream/hybridsource.md
+++ b/docs/content.zh/docs/connectors/datastream/hybridsource.md
@@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca
```java
long switchTimestamp = ...; // derive from file input paths
FileSource<String> fileSource =
- FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
+ FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
KafkaSource<String> kafkaSource =
KafkaSource.<String>builder()
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
diff --git a/docs/content/docs/connectors/datastream/formats/text_files.md b/docs/content/docs/connectors/datastream/formats/text_files.md
index b79c9c6..7e9fcf6 100644
--- a/docs/content/docs/connectors/datastream/formats/text_files.md
+++ b/docs/content/docs/connectors/datastream/formats/text_files.md
@@ -28,7 +28,7 @@ under the License.
# Text files format
-Flink supports reading from text lines from a file using `TextLineInputFormat`. This format uses Java's built-in InputStreamReader to decode the byte stream using various supported charset encodings.
+Flink supports reading from text lines from a file using `TextLineFormat`. This format uses Java's built-in InputStreamReader to decode the byte stream using various supported charset encodings.
To use the format you need to add the Flink Parquet dependency to your project:
```xml
@@ -47,7 +47,7 @@ There is no need for a watermark strategy as records do not contain event timest
```java
final FileSource<String> source =
- FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
+ FileSource.forRecordStreamFormat(new TextLineFormat(), /* Flink Path */)
.build();
final DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
@@ -60,7 +60,7 @@ There is no need for a watermark strategy as records do not contain event timest
```java
final FileSource<String> source =
- FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
+ FileSource.forRecordStreamFormat(new TextLineFormat(), /* Flink Path */)
.monitorContinuously(Duration.ofSeconds(1L))
.build();
final DataStream<String> stream =
diff --git a/docs/content/docs/connectors/datastream/hybridsource.md b/docs/content/docs/connectors/datastream/hybridsource.md
index 7058ac4..02f5077 100644
--- a/docs/content/docs/connectors/datastream/hybridsource.md
+++ b/docs/content/docs/connectors/datastream/hybridsource.md
@@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca
```java
long switchTimestamp = ...; // derive from file input paths
FileSource<String> fileSource =
- FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
+ FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
KafkaSource<String> kafkaSource =
KafkaSource.<String>builder()
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
index 8df875b..24acb6a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -41,7 +41,7 @@ import java.util.List;
*
* <pre>{@code
* FileSource<String> fileSource =
- * FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();
+ * FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
* KafkaSource<String> kafkaSource =
* KafkaSource.<String>builder()
* .setBootstrapServers("localhost:9092")
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
similarity index 93%
rename from flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
rename to flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
index 00d906d..c6fa3b1 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
@@ -42,7 +42,7 @@ import java.io.InputStreamReader;
* with their internal buffering of stream input and charset decoder state.
*/
@PublicEvolving
-public class TextLineInputFormat extends SimpleStreamFormat<String> {
+public class TextLineFormat extends SimpleStreamFormat<String> {
private static final long serialVersionUID = 1L;
@@ -50,11 +50,11 @@ public class TextLineInputFormat extends SimpleStreamFormat<String> {
private final String charsetName;
- public TextLineInputFormat() {
+ public TextLineFormat() {
this(DEFAULT_CHARSET_NAME);
}
- public TextLineInputFormat(String charsetName) {
+ public TextLineFormat(String charsetName) {
this.charsetName = charsetName;
}
@@ -72,7 +72,7 @@ public class TextLineInputFormat extends SimpleStreamFormat<String> {
// ------------------------------------------------------------------------
- /** The actual reader for the {@code TextLineInputFormat}. */
+ /** The actual reader for the {@code TextLineFormat}. */
public static final class Reader implements StreamFormat.Reader<String> {
private final BufferedReader reader;
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 5f2a851..eea0fa7 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.connector.file.src;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -119,8 +119,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
writeHiddenJunkFiles(testDir);
final FileSource<String> source =
- FileSource.forRecordStreamFormat(
- new TextLineInputFormat(), Path.fromLocalFile(testDir))
+ FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -185,8 +184,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
final File testDir = TMP_FOLDER.newFolder();
final FileSource<String> source =
- FileSource.forRecordStreamFormat(
- new TextLineInputFormat(), Path.fromLocalFile(testDir))
+ FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
.monitorContinuously(Duration.ofMillis(5))
.build();
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
index d70745f..a25a14c 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.file.src.impl;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.core.fs.Path;
@@ -64,7 +64,7 @@ public class FileSourceReaderTest {
private static FileSourceReader<String, FileSourceSplit> createReader(
TestingReaderContext context) {
return new FileSourceReader<>(
- context, new StreamFormatAdapter<>(new TextLineInputFormat()), new Configuration());
+ context, new StreamFormatAdapter<>(new TextLineFormat()), new Configuration());
}
private static FileSourceSplit createTestFileSplit() throws IOException {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 4b42073..7b8442a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -25,8 +25,8 @@ import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter;
import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher;
-import org.apache.flink.connectors.hive.read.HiveInputFormat;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
@@ -310,7 +310,7 @@ public class HiveSourceBuilder {
private BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
return LimitableBulkFormat.create(
- new HiveInputFormat(
+ new HiveBulkFormatAdapter(
new JobConfWrapper(jobConf),
partitionKeys,
fullSchema.getFieldNames(),
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
similarity index 97%
rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
index 6ab0dc9..99aad05 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
@@ -27,7 +27,7 @@ import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
-import org.apache.flink.orc.OrcColumnarRowInputFormat;
+import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
import org.apache.flink.orc.shim.OrcShim;
import org.apache.flink.table.catalog.hive.client.HiveShim;
@@ -62,11 +62,11 @@ import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_S
* BulkFormat instances, because different hive partitions may need different BulkFormat to do the
* reading.
*/
-public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
+public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSplit> {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(HiveInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HiveBulkFormatAdapter.class);
// schema evolution configs are not available in older versions of IOConstants, let's define
// them ourselves
@@ -83,7 +83,7 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
private final boolean useMapRedReader;
private final PartitionFieldExtractor<HiveSourceSplit> partitionFieldExtractor;
- public HiveInputFormat(
+ public HiveBulkFormatAdapter(
JobConfWrapper jobConfWrapper,
List<String> partitionKeys,
String[] fieldNames,
@@ -162,7 +162,7 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
}
}
- private OrcColumnarRowInputFormat<?, HiveSourceSplit> createOrcFormat() {
+ private OrcColumnarRowFileInputFormat<?, HiveSourceSplit> createOrcFormat() {
return hiveVersion.startsWith("1.")
? OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
jobConfWrapper.conf(),
@@ -172,7 +172,7 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
computeSelectedFields(),
Collections.emptyList(),
DEFAULT_SIZE)
- : OrcColumnarRowInputFormat.createPartitionedFormat(
+ : OrcColumnarRowFileInputFormat.createPartitionedFormat(
OrcShim.createShim(hiveVersion),
jobConfWrapper.conf(),
tableRowType(),
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
index 47b36d9..012ef01 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
@@ -88,8 +88,8 @@ public class HiveCompactReaderFactory implements CompactReader.Factory<RowData>
@Override
public CompactReader<RowData> create(CompactContext context) throws IOException {
HiveSourceSplit split = createSplit(context.getPath(), context.getFileSystem());
- HiveInputFormat format =
- new HiveInputFormat(
+ HiveBulkFormatAdapter format =
+ new HiveBulkFormatAdapter(
jobConfWrapper,
partitionKeys,
fieldNames,
diff --git a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
index b186faf..47af6ad 100644
--- a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
+++ b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
@@ -19,7 +19,7 @@
package org.apache.flink.orc.nohive;
import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.orc.OrcColumnarRowInputFormat;
+import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
import org.apache.flink.orc.OrcFilters;
import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
import org.apache.flink.orc.vector.ColumnBatchFactory;
@@ -42,16 +42,16 @@ import static org.apache.flink.orc.OrcSplitReaderUtil.getSelectedOrcFields;
import static org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector.createFlinkVector;
import static org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVectorFromConstant;
-/** Helper class to create {@link OrcColumnarRowInputFormat} for no-hive. */
+/** Helper class to create {@link OrcColumnarRowFileInputFormat} for no-hive. */
public class OrcNoHiveColumnarRowInputFormat {
private OrcNoHiveColumnarRowInputFormat() {}
/**
- * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be
+ * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be
* generated by split.
*/
public static <SplitT extends FileSourceSplit>
- OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
+ OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
Configuration hadoopConfig,
RowType tableType,
List<String> partitionKeys,
@@ -84,7 +84,7 @@ public class OrcNoHiveColumnarRowInputFormat {
return new VectorizedColumnBatch(vectors);
};
- return new OrcColumnarRowInputFormat<>(
+ return new OrcColumnarRowFileInputFormat<>(
new OrcNoHiveShim(),
hadoopConfig,
convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys),
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
similarity index 95%
rename from flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
rename to flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
index bb12d51..d88c967 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
@@ -55,7 +55,7 @@ import static org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVec
* fields, which can be extracted from path. Therefore, the {@link #getProducedType()} may be
* different and types of extra fields need to be added.
*/
-public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
+public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSplit>
extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
private static final long serialVersionUID = 1L;
@@ -63,7 +63,7 @@ public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
private final ColumnBatchFactory<BatchT, SplitT> batchFactory;
private final RowType projectedOutputType;
- public OrcColumnarRowInputFormat(
+ public OrcColumnarRowFileInputFormat(
final OrcShim<BatchT> shim,
final Configuration hadoopConfig,
final TypeDescription schema,
@@ -126,11 +126,11 @@ public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
}
/**
- * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be
+ * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be
* generated by split.
*/
public static <SplitT extends FileSourceSplit>
- OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
+ OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
OrcShim<VectorizedRowBatch> shim,
Configuration hadoopConfig,
RowType tableType,
@@ -164,7 +164,7 @@ public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
return new VectorizedColumnBatch(vectors);
};
- return new OrcColumnarRowInputFormat<>(
+ return new OrcColumnarRowFileInputFormat<>(
shim,
hadoopConfig,
convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys),
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
index 853b3b8..7343959 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
@@ -129,7 +129,7 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
.defaultValue());
- return OrcColumnarRowInputFormat.createPartitionedFormat(
+ return OrcColumnarRowFileInputFormat.createPartitionedFormat(
OrcShim.defaultShim(),
conf,
tableType,
diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
similarity index 93%
rename from flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
rename to flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
index 5094866..2243d5c 100644
--- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
+++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
@@ -62,8 +62,8 @@ import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionP
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-/** Test for {@link OrcColumnarRowInputFormat}. */
-public class OrcColumnarRowInputFormatTest {
+/** Test for {@link OrcColumnarRowFileInputFormat}. */
+public class OrcColumnarRowFileInputFormatTest {
/** Small batch size for test more boundary conditions. */
protected static final int BATCH_SIZE = 9;
@@ -97,7 +97,7 @@ public class OrcColumnarRowInputFormatTest {
@Test
public void testReadFileInSplits() throws IOException {
- OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
AtomicInteger cnt = new AtomicInteger(0);
@@ -124,7 +124,7 @@ public class OrcColumnarRowInputFormatTest {
@Test
public void testReadFileWithSelectFields() throws IOException {
- OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});
AtomicInteger cnt = new AtomicInteger(0);
@@ -153,7 +153,7 @@ public class OrcColumnarRowInputFormatTest {
@Test
public void testReadDecimalTypeFile() throws IOException {
- OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
createFormat(DECIMAL_FILE_TYPE, new int[] {0});
AtomicInteger cnt = new AtomicInteger(0);
@@ -217,7 +217,7 @@ public class OrcColumnarRowInputFormatTest {
int[] projectedFields = {8, 1, 3, 0, 5, 2};
- OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
createPartitionFormat(
tableType, new ArrayList<>(partSpec.keySet()), projectedFields);
@@ -257,7 +257,7 @@ public class OrcColumnarRowInputFormatTest {
@Test
public void testReadFileAndRestore() throws IOException {
- OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
// pick a middle split
@@ -276,7 +276,7 @@ public class OrcColumnarRowInputFormatTest {
new Between("_col0", PredicateLeaf.Type.LONG, 0L, 975000L),
new Equals("_col0", PredicateLeaf.Type.LONG, 980001L),
new Between("_col0", PredicateLeaf.Type.LONG, 990000L, 1800000L)));
- OrcColumnarRowInputFormat<?, FileSourceSplit> format =
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
createFormat(FLAT_FILE_TYPE, new int[] {0, 1}, filter);
// pick a middle split
@@ -290,7 +290,7 @@ public class OrcColumnarRowInputFormatTest {
}
private void innerTestRestore(
- OrcColumnarRowInputFormat<?, FileSourceSplit> format,
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
FileSourceSplit split,
int breakCnt,
int expectedCnt,
@@ -338,14 +338,14 @@ public class OrcColumnarRowInputFormatTest {
assertEquals(expectedTotalF0, totalF0.get());
}
- protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat(
+ protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat(
RowType formatType, int[] selectedFields) {
return createFormat(formatType, selectedFields, new ArrayList<>());
}
- protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat(
+ protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat(
RowType formatType, int[] selectedFields, List<Predicate> conjunctPredicates) {
- return OrcColumnarRowInputFormat.createPartitionedFormat(
+ return OrcColumnarRowFileInputFormat.createPartitionedFormat(
OrcShim.defaultShim(),
new Configuration(),
formatType,
@@ -356,9 +356,9 @@ public class OrcColumnarRowInputFormatTest {
BATCH_SIZE);
}
- protected OrcColumnarRowInputFormat<?, FileSourceSplit> createPartitionFormat(
+ protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createPartitionFormat(
RowType tableType, List<String> partitionKeys, int[] selectedFields) {
- return OrcColumnarRowInputFormat.createPartitionedFormat(
+ return OrcColumnarRowFileInputFormat.createPartitionedFormat(
OrcShim.defaultShim(),
new Configuration(),
tableType,
@@ -370,13 +370,13 @@ public class OrcColumnarRowInputFormatTest {
}
private BulkFormat.Reader<RowData> createReader(
- OrcColumnarRowInputFormat<?, FileSourceSplit> format, FileSourceSplit split)
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format, FileSourceSplit split)
throws IOException {
return format.createReader(new org.apache.flink.configuration.Configuration(), split);
}
private BulkFormat.Reader<RowData> restoreReader(
- OrcColumnarRowInputFormat<?, FileSourceSplit> format,
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
FileSourceSplit split,
long offset,
long recordSkipCount)
@@ -389,7 +389,7 @@ public class OrcColumnarRowInputFormatTest {
}
private void forEach(
- OrcColumnarRowInputFormat<?, FileSourceSplit> format,
+ OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
FileSourceSplit split,
Consumer<RowData> action)
throws IOException {
diff --git a/flink-python/pyflink/datastream/connectors.py b/flink-python/pyflink/datastream/connectors.py
index 035cd62..b4b3335 100644
--- a/flink-python/pyflink/datastream/connectors.py
+++ b/flink-python/pyflink/datastream/connectors.py
@@ -757,7 +757,7 @@ class StreamFormat(object):
:param charset_name: The charset to decode the byte stream.
"""
j_stream_format = get_gateway().jvm.org.apache.flink.connector.file.src.reader. \
- TextLineInputFormat(charset_name)
+ TextLineFormat(charset_name)
return StreamFormat(j_stream_format)
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
index 471af36..92a1d22 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.connector.file.src.util.Utils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.FileUtils;
@@ -56,8 +56,7 @@ public class LimitableBulkFormatTest {
// read
BulkFormat<String, FileSourceSplit> format =
- LimitableBulkFormat.create(
- new StreamFormatAdapter<>(new TextLineInputFormat()), 22L);
+ LimitableBulkFormat.create(new StreamFormatAdapter<>(new TextLineFormat()), 22L);
BulkFormat.Reader<String> reader =
format.createReader(
@@ -89,8 +88,7 @@ public class LimitableBulkFormatTest {
// read
BulkFormat<String, FileSourceSplit> format =
- LimitableBulkFormat.create(
- new StreamFormatAdapter<>(new TextLineInputFormat()), limit);
+ LimitableBulkFormat.create(new StreamFormatAdapter<>(new TextLineFormat()), limit);
BulkFormat.Reader<String> reader =
format.createReader(