You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/20 06:27:24 UTC
[incubator-paimon] branch master updated: [code] Clean Flink related comment and naming (#642)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5945fefd9 [code] Clean Flink related comment and naming (#642)
5945fefd9 is described below
commit 5945fefd90c28af5ac1d9e1463ae76c3247c799b
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Mar 20 14:27:20 2023 +0800
[code] Clean Flink related comment and naming (#642)
---
.../org/apache/paimon/codegen/CodeGenLoader.java | 6 +-
.../apache/paimon/codegen/CodeGeneratorImpl.java | 1 -
.../apache/paimon/codegen/SortCodeGenerator.scala | 5 +-
.../org/apache/paimon/codegen/CompileUtils.java | 2 +-
.../org/apache/paimon/codegen/GeneratedClass.java | 2 +-
.../paimon/codegen/NormalizedKeyComputer.java | 2 +-
.../java/org/apache/paimon/codegen/Projection.java | 5 +-
.../apache/paimon/codegen/RecordComparator.java | 2 +-
.../org/apache/paimon/fs/local/LocalFileIO.java | 2 +-
.../apache/paimon/io/DataInputDeserializer.java | 2 -
.../paimon/io/DataInputViewStreamWrapper.java | 5 +-
.../org/apache/paimon/options/OptionsUtils.java | 51 ----------
.../apache/paimon/plugin/ComponentClassLoader.java | 2 -
.../org/apache/paimon/plugin/PluginLoader.java | 2 -
.../org/apache/paimon/utils/DateTimeUtils.java | 6 +-
.../java/org/apache/paimon/utils/HadoopUtils.java | 15 +--
.../main/java/org/apache/paimon/utils/Pool.java | 13 +--
.../org/apache/paimon/utils/Preconditions.java | 2 +-
.../java/org/apache/paimon/utils/ProjectedRow.java | 2 -
.../java/org/apache/paimon/utils/Projection.java | 2 -
.../org/apache/paimon/data/BinaryStringTest.java | 2 +-
.../apache/paimon/datagen/RowDataGenerator.java | 2 +-
.../apache/paimon/fs/HadoopConfigLoadingTest.java | 8 +-
.../org/apache/paimon/utils/CommonTestUtils.java | 112 +--------------------
.../src/main/java/org/apache/paimon/WriteMode.java | 2 +-
.../org/apache/paimon/casting/CastExecutor.java | 1 -
.../apache/paimon/disk/ChannelReaderInputView.java | 9 +-
.../paimon/disk/ChannelWriterOutputView.java | 7 +-
.../apache/paimon/disk/FileChannelManagerImpl.java | 2 +-
.../apache/paimon/sort/BinaryExternalMerger.java | 2 +-
.../paimon/sort/BinaryInMemorySortBuffer.java | 16 +--
.../apache/paimon/sort/BinaryIndexedSortable.java | 1 -
.../apache/paimon/utils/ExecutorThreadFactory.java | 15 +--
.../paimon/stats/FieldStatsCollectorTest.java | 14 +--
.../org/apache/paimon/utils/FailingFileIO.java | 2 +-
.../main/java/org/apache/paimon/oss/OSSFileIO.java | 4 +-
.../main/java/org/apache/paimon/s3/S3FileIO.java | 4 +-
.../paimon/format/avro/AbstractAvroBulkFormat.java | 6 +-
.../apache/paimon/format/avro/AvroFileFormat.java | 4 +-
.../paimon/format/avro/AvroSchemaConverter.java | 10 +-
.../format/avro/AvroToRowDataConverters.java | 4 +-
.../paimon/format/avro/AvroWriterFactory.java | 3 +-
.../format/avro/RowDataToAvroConverters.java | 6 +-
.../paimon/format/fs/HadoopReadOnlyFileSystem.java | 17 ++--
.../apache/paimon/format/orc/OrcFileFormat.java | 2 +-
.../apache/paimon/format/orc/OrcReaderFactory.java | 22 ++--
.../orc/ThreadLocalClassLoaderConfiguration.java | 4 +-
.../format/orc/reader/AbstractOrcColumnVector.java | 4 +-
.../format/orc/reader/OrcArrayColumnVector.java | 8 +-
.../format/orc/reader/OrcBytesColumnVector.java | 2 +-
.../format/orc/reader/OrcDecimalColumnVector.java | 2 +-
.../format/orc/reader/OrcDoubleColumnVector.java | 2 +-
.../format/orc/reader/OrcLongColumnVector.java | 2 +-
.../format/orc/reader/OrcMapColumnVector.java | 12 +--
.../format/orc/reader/OrcRowColumnVector.java | 6 +-
.../format/orc/reader/OrcSplitReaderUtil.java | 1 -
.../orc/reader/OrcTimestampColumnVector.java | 2 +-
.../format/orc/writer/PhysicalWriterImpl.java | 4 +-
.../format/parquet/ParquetReaderFactory.java | 2 +-
.../format/parquet/ParquetSchemaConverter.java | 2 +-
.../format/parquet/ParquetWriterFactory.java | 5 +-
.../parquet/writer/ParquetRowDataBuilder.java | 2 +-
.../writer/PositionOutputStreamAdapter.java | 6 +-
.../format/parquet/writer/StreamOutputFile.java | 2 +-
.../paimon/format/avro/AvroBulkFormatTest.java | 4 +-
.../java/org/apache/paimon/hive/HiveTypeUtils.java | 2 +-
.../java/org/apache/paimon/hive/HiveSchema.java | 4 +-
.../paimon/hive/mapred/PaimonOutputFormat.java | 2 +-
.../objectinspector/PaimonDateObjectInspector.java | 2 +-
.../apache/paimon/hive/HiveTableSchemaTest.java | 6 +-
.../paimon/hive/PaimonStorageHandlerITCase.java | 2 +-
.../apache/paimon/spark/SparkDataSourceReader.java | 2 +-
.../apache/paimon/spark/SparkInternalRowTest.java | 2 +-
.../org/apache/paimon/spark/SparkReadITCase.java | 2 +-
.../org/apache/paimon/spark/SparkTypeTest.java | 4 +-
.../org/apache/paimon/spark/SparkArrayData.java | 18 ++--
.../java/org/apache/paimon/spark/SparkCatalog.java | 8 +-
.../org/apache/paimon/spark/SparkInternalRow.java | 46 ++++-----
.../java/org/apache/paimon/spark/SparkRow.java | 30 +++---
.../java/org/apache/paimon/spark/SparkScan.java | 2 +-
.../java/org/apache/paimon/spark/SparkTable.java | 2 +-
.../org/apache/paimon/spark/SparkTypeUtils.java | 22 ++--
.../apache/paimon/spark/MinioTestContainer.java | 4 +-
.../apache/paimon/spark/SparkInternalRowTest.java | 2 +-
.../org/apache/paimon/spark/SparkReadTestBase.java | 2 +-
.../org/apache/paimon/spark/SparkTypeTest.java | 8 +-
paimon-spark/pom.xml | 2 -
87 files changed, 210 insertions(+), 442 deletions(-)
diff --git a/paimon-codegen-loader/src/main/java/org/apache/paimon/codegen/CodeGenLoader.java b/paimon-codegen-loader/src/main/java/org/apache/paimon/codegen/CodeGenLoader.java
index 50f8ecd4f..3dbf4db36 100644
--- a/paimon-codegen-loader/src/main/java/org/apache/paimon/codegen/CodeGenLoader.java
+++ b/paimon-codegen-loader/src/main/java/org/apache/paimon/codegen/CodeGenLoader.java
@@ -20,10 +20,10 @@ package org.apache.paimon.codegen;
import org.apache.paimon.plugin.PluginLoader;
-/** Copied and modified from the flink-table-planner-loader module. */
+/** Loader to load codegen classes. */
public class CodeGenLoader {
- private static final String FLINK_TABLE_STORE_CODEGEN_FAT_JAR = "paimon-codegen.jar";
+ private static final String CODEGEN_FAT_JAR = "paimon-codegen.jar";
// Singleton lazy initialization
@@ -32,7 +32,7 @@ public class CodeGenLoader {
private static synchronized PluginLoader getLoader() {
if (loader == null) {
// Avoid NoClassDefFoundError without cause by exception
- loader = new PluginLoader(FLINK_TABLE_STORE_CODEGEN_FAT_JAR);
+ loader = new PluginLoader(CODEGEN_FAT_JAR);
}
return loader;
}
diff --git a/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java b/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java
index c6e544d0d..eb3dd3d5e 100644
--- a/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java
+++ b/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java
@@ -56,7 +56,6 @@ public class CodeGeneratorImpl implements CodeGenerator {
private SortSpec getAscendingSortSpec(int numFields) {
SortSpec.SortSpecBuilder builder = SortSpec.builder();
for (int i = 0; i < numFields; i++) {
- // Flink's default null collation is NullCollation.LOW, see FlinkPlannerImpl
builder.addField(i, true, false);
}
return builder.build();
diff --git a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/SortCodeGenerator.scala b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/SortCodeGenerator.scala
index 30d3639ea..aa2efd524 100644
--- a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/SortCodeGenerator.scala
+++ b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/SortCodeGenerator.scala
@@ -182,11 +182,10 @@ class SortCodeGenerator(val input: RowType, val sortSpec: SortSpec) {
def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String] = {
/* Example generated code, for int:
if (record.isNullAt(0)) {
- org.apache.flink.table.data.binary.BinaryRowDataUtil.minNormalizedKey(target, offset+0, 5);
+ SortUtil.minNormalizedKey(target, offset+0, 5);
} else {
target.put(offset+0, (byte) 1);
- org.apache.flink.table.data.binary.BinaryRowDataUtil.putIntNormalizedKey(
- record.getInt(0), target, offset+1, 4);
+ SortUtil.putIntNormalizedKey(record.getInt(0), target, offset+1, 4);
}
*/
val putKeys = new mutable.ArrayBuffer[String]
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java b/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
index fbe4235b2..ec45221cd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
@@ -29,7 +29,7 @@ import java.util.Objects;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
-/** Utilities to compile a generated code to a Class. Copied from Flink. */
+/** Utilities to compile a generated code to a Class. */
public final class CompileUtils {
// used for logging the generated codes to a same place
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/GeneratedClass.java b/paimon-common/src/main/java/org/apache/paimon/codegen/GeneratedClass.java
index d6791254b..97d639230 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/GeneratedClass.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/GeneratedClass.java
@@ -29,7 +29,7 @@ import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
* A wrapper for generated class, defines a {@link #newInstance(ClassLoader)} method to get an
- * instance by reference objects easily. Copied from Flink.
+ * instance by reference objects easily.
*/
public final class GeneratedClass<T> implements Serializable {
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/NormalizedKeyComputer.java b/paimon-common/src/main/java/org/apache/paimon/codegen/NormalizedKeyComputer.java
index f0d8b46b4..0a59b5b67 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/NormalizedKeyComputer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/NormalizedKeyComputer.java
@@ -23,7 +23,7 @@ import org.apache.paimon.memory.MemorySegment;
/**
* Normalized key computer for {@code SortBuffer}. For performance, subclasses are usually
- * implemented through CodeGenerator. Copied from Flink.
+ * implemented through CodeGenerator.
*/
public interface NormalizedKeyComputer {
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/Projection.java b/paimon-common/src/main/java/org/apache/paimon/codegen/Projection.java
index 81922588c..e872c512e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/Projection.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/Projection.java
@@ -21,10 +21,7 @@ package org.apache.paimon.codegen;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-/**
- * Interface for code generated projection, which will map a RowData to another BinaryRowData.
- * Copied from Flink.
- */
+/** Interface for code generated projection, which will map a RowData to another BinaryRowData. */
public interface Projection {
BinaryRow apply(InternalRow row);
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/RecordComparator.java b/paimon-common/src/main/java/org/apache/paimon/codegen/RecordComparator.java
index 8e3014183..8d58ca58d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/RecordComparator.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/RecordComparator.java
@@ -25,7 +25,7 @@ import java.util.Comparator;
/**
* Record comparator for {@code BinaryInMemorySortBuffer}. For performance, subclasses are usually
- * implemented through CodeGenerator. A new interface for helping JVM inline. Copied from Flink.
+ * implemented through CodeGenerator. A new interface for helping JVM inline.
*/
public interface RecordComparator extends Comparator<InternalRow>, Serializable {
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index 3f19bf316..bf6e3eab2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -93,7 +93,7 @@ public class LocalFileIO implements FileIO {
"File "
+ file
+ " does not exist or the user running "
- + "Flink ('"
+ + "Paimon ('"
+ System.getProperty("user.name")
+ "') has insufficient permissions to access it.");
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/io/DataInputDeserializer.java b/paimon-common/src/main/java/org/apache/paimon/io/DataInputDeserializer.java
index 77a34a918..5707466ed 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/DataInputDeserializer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/DataInputDeserializer.java
@@ -71,8 +71,6 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl
this.position = buffer.arrayOffset() + buffer.position();
this.end = this.position + buffer.remaining();
} else if (buffer.isDirect() || buffer.isReadOnly()) {
- // TODO: FLINK-8585 handle readonly and other non array based buffers more efficiently
- // without data copy
this.buffer = new byte[buffer.remaining()];
this.position = 0;
this.end = this.buffer.length;
diff --git a/paimon-common/src/main/java/org/apache/paimon/io/DataInputViewStreamWrapper.java b/paimon-common/src/main/java/org/apache/paimon/io/DataInputViewStreamWrapper.java
index b14eddbd3..14f20ad8d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/DataInputViewStreamWrapper.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/DataInputViewStreamWrapper.java
@@ -23,10 +23,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-/**
- * Utility class that turns an {@link InputStream} into a {@link
- * org.apache.flink.core.memory.DataInputView}.
- */
+/** Utility class that turns an {@link InputStream} into a {@link DataInputView}. */
public class DataInputViewStreamWrapper extends DataInputStream implements DataInputView {
public DataInputViewStreamWrapper(InputStream in) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
index 54dc795a4..a0943208f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
@@ -20,12 +20,8 @@ package org.apache.paimon.options;
import org.apache.paimon.utils.TimeUtils;
-import javax.annotation.Nonnull;
-
-import java.io.File;
import java.time.Duration;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -36,53 +32,6 @@ import static org.apache.paimon.options.StructuredOptionsSplitter.escapeWithSing
/** Utility class for {@link Options} related helper functions. */
public class OptionsUtils {
- private static final String[] EMPTY = new String[0];
-
- /**
- * Parses a string as a map of strings. The expected format of the map is:
- *
- * <pre>
- * key1:value1,key2:value2
- * </pre>
- *
- * <p>Parts of the string can be escaped by wrapping with single or double quotes.
- *
- * @param stringSerializedMap a string to parse
- * @return parsed map
- */
- public static Map<String, String> parseMap(String stringSerializedMap) {
- return StructuredOptionsSplitter.splitEscaped(stringSerializedMap, ',').stream()
- .map(p -> StructuredOptionsSplitter.splitEscaped(p, ':'))
- .collect(
- Collectors.toMap(
- arr -> arr.get(0), // key name
- arr -> arr.get(1) // value
- ));
- }
-
- @Nonnull
- public static String[] splitPaths(@Nonnull String separatedPaths) {
- return separatedPaths.length() > 0
- ? separatedPaths.split(",|" + File.pathSeparator)
- : EMPTY;
- }
-
- /**
- * Extract and parse Flink configuration properties with a given name prefix and return the
- * result as a Map.
- */
- public static Map<String, String> getPrefixedKeyValuePairs(
- String prefix, Options configuration) {
- Map<String, String> result = new HashMap<>();
- for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
- if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
- String key = entry.getKey().substring(prefix.length());
- result.put(key, entry.getValue());
- }
- }
- return result;
- }
-
// --------------------------------------------------------------------------------------------
// Type conversion
// --------------------------------------------------------------------------------------------
diff --git a/paimon-common/src/main/java/org/apache/paimon/plugin/ComponentClassLoader.java b/paimon-common/src/main/java/org/apache/paimon/plugin/ComponentClassLoader.java
index 226fb9bc1..4e24ae2a3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/plugin/ComponentClassLoader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/plugin/ComponentClassLoader.java
@@ -51,8 +51,6 @@ import java.util.Iterator;
* <li>component-first: component -> bootstrap -> owner; opt-in.
* <li>owner-first: owner -> component -> bootstrap; opt-in.
* </ul>
- *
- * <p>NOTE: Copied from Flink.
*/
public class ComponentClassLoader extends URLClassLoader {
private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER;
diff --git a/paimon-common/src/main/java/org/apache/paimon/plugin/PluginLoader.java b/paimon-common/src/main/java/org/apache/paimon/plugin/PluginLoader.java
index 43199c452..ea1bfc62b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/plugin/PluginLoader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/plugin/PluginLoader.java
@@ -51,8 +51,6 @@ public class PluginLoader {
Stream.concat(
Arrays.stream(PARENT_FIRST_LOGGING_PATTERNS),
Stream.of(
- // These packages are shipped either by
- // flink-table-runtime or flink-dist itself
"org.codehaus.janino",
"org.codehaus.commons",
"org.apache.commons.lang3"))
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
index 66d509970..6ed26cafd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
@@ -37,11 +37,7 @@ import static java.time.temporal.ChronoField.NANO_OF_SECOND;
import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
import static java.time.temporal.ChronoField.YEAR;
-/**
- * Utils for date time.
- *
- * <p>NOTE: Copied from Flink.
- */
+/** Utils for date time. */
public class DateTimeUtils {
/** The julian date of the epoch, 1970-01-01. */
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
index 2dc13cc8b..5197fac33 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
@@ -89,12 +89,12 @@ public class HadoopUtils {
}
}
- // Approach 2: Flink configuration (deprecated)
+ // Approach 2: Paimon configuration (deprecated)
final String hdfsDefaultPath = options.getString(HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
LOG.debug(
- "Using hdfs-default configuration-file path from Flink config: {}",
+ "Using hdfs-default configuration-file path from Paimon config: {}",
hdfsDefaultPath);
foundHadoopConfiguration = true;
}
@@ -103,13 +103,14 @@ public class HadoopUtils {
if (hdfsSitePath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
LOG.debug(
- "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
+ "Using hdfs-site configuration-file path from Paimon config: {}", hdfsSitePath);
foundHadoopConfiguration = true;
}
final String hadoopConfigPath = options.getString(PATH_HADOOP_CONFIG, null);
if (hadoopConfigPath != null) {
- LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
+ LOG.debug(
+ "Searching Hadoop configuration files in Paimon config: {}", hadoopConfigPath);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
}
@@ -122,8 +123,8 @@ public class HadoopUtils {
addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
}
- // Approach 4: Flink configuration
- // add all configuration key with prefix 'hadoop.' in flink conf to hadoop conf
+ // Approach 4: Paimon configuration
+ // add all configuration key with prefix 'hadoop.' in Paimon conf to hadoop conf
for (String key : options.keySet()) {
for (String prefix : CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
@@ -131,7 +132,7 @@ public class HadoopUtils {
String value = options.getString(key, null);
result.set(newKey, value);
LOG.debug(
- "Adding Flink config entry for {} as {}={} to Hadoop config",
+ "Adding Paimon config entry for {} as {}={} to Hadoop config",
key,
newKey,
value);
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java b/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
index 5a9b2acbe..888bf8874 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
@@ -18,22 +18,11 @@
package org.apache.paimon.utils;
-import org.apache.paimon.reader.RecordReader;
-
import javax.annotation.Nullable;
import java.util.concurrent.ArrayBlockingQueue;
-/**
- * A pool to cache and recycle heavyweight objects, to reduce object allocation.
- *
- * <p>This pool can be used in the {@link RecordReader}, when the returned objects are heavyweight
- * and need to be reused for efficiency. Because the reading happens in I/O threads while the record
- * processing happens in Flink's main processing threads, these objects cannot be reused immediately
- * after being returned. They can be reused, once they are recycled back to the pool.
- *
- * @param <T> The type of object cached in the pool.
- */
+/** A pool to cache and recycle heavyweight objects, to reduce object allocation. */
public class Pool<T> {
private final ArrayBlockingQueue<T> pool;
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Preconditions.java b/paimon-common/src/main/java/org/apache/paimon/utils/Preconditions.java
index 46441a23b..b90ea3b95 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Preconditions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Preconditions.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ExecutionException;
* A collection of static utility methods to validate input.
*
* <p>This class is modelled after Google Guava's Preconditions class, and partly takes code from
- * that class. We add this code to the Flink code base in order to reduce external dependencies.
+ * that class. We add this code to the Paimon code base in order to reduce external dependencies.
*/
public final class Preconditions {
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
index 96c91520a..7c97e7652 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
@@ -36,8 +36,6 @@ import java.util.Arrays;
* <p>Projection includes both reducing the accessible fields and reordering them.
*
* <p>Note: This class supports only top-level projections, not nested projections.
- *
- * <p>NOTE: Copied from Flink.
*/
public class ProjectedRow implements InternalRow {
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Projection.java b/paimon-common/src/main/java/org/apache/paimon/utils/Projection.java
index d673a7724..bfdca8e71 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Projection.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Projection.java
@@ -38,8 +38,6 @@ import static org.apache.paimon.types.DataTypeRoot.ROW;
/**
* {@link Projection} represents a list of (possibly nested) indexes that can be used to project
* data types. A row projection includes both reducing the accessible fields and reordering them.
- *
- * <p>NOTE: Copied from Flink.
*/
public abstract class Projection {
diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java
index b61d5bd24..c5c567389 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java
@@ -131,7 +131,7 @@ public class BinaryStringTest {
checkBasic(",", 1);
checkBasic("hello", 5);
checkBasic("hello world", 11);
- checkBasic("Flink中文社区", 9);
+ checkBasic("Paimon中文社区", 10);
checkBasic("中 文 社 区", 7);
checkBasic("¡", 1); // 2 bytes char
diff --git a/paimon-common/src/test/java/org/apache/paimon/datagen/RowDataGenerator.java b/paimon-common/src/test/java/org/apache/paimon/datagen/RowDataGenerator.java
index 543b5257c..08497e1fb 100644
--- a/paimon-common/src/test/java/org/apache/paimon/datagen/RowDataGenerator.java
+++ b/paimon-common/src/test/java/org/apache/paimon/datagen/RowDataGenerator.java
@@ -21,7 +21,7 @@ package org.apache.paimon.datagen;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-/** Data generator for Flink's internal {@link InternalRow} type. */
+/** Data generator for Paimon's internal {@link InternalRow} type. */
public class RowDataGenerator implements DataGenerator<InternalRow> {
private final DataGenerator<?>[] fieldGenerators;
diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
index 0a34adbea..50816dc5a 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
@@ -39,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Tests that validate the loading of the Hadoop configuration, relative to entries in the Flink
+ * Tests that validate the loading of the Hadoop configuration, relative to entries in the Paimon
* configuration and the environment variables.
*/
@SuppressWarnings("deprecation")
@@ -188,8 +188,8 @@ public class HadoopConfigLoadingTest {
final String k5 = "key5";
final String v1 = "from HADOOP_CONF_DIR";
- final String v2 = "from Flink config `fs.hdfs.hadoopconf`";
- final String v3 = "from Flink config `fs.hdfs.hdfsdefault`";
+ final String v2 = "from Paimon config `fs.hdfs.hadoopconf`";
+ final String v3 = "from Paimon config `fs.hdfs.hdfsdefault`";
final String v4 = "from HADOOP_HOME/etc/hadoop";
final String v5 = "from HADOOP_HOME/conf";
@@ -270,7 +270,7 @@ public class HadoopConfigLoadingTest {
}
@Test
- public void loadFromFlinkConfEntry() throws Exception {
+ public void loadFromPaimonConfEntry() {
final String prefix = "hadoop.";
final String k1 = "brooklyn";
diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java b/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
index 76ba50f81..0121a918b 100644
--- a/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
@@ -18,14 +18,6 @@
package org.apache.paimon.utils;
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Map;
@@ -41,72 +33,6 @@ import static org.junit.jupiter.api.Assertions.fail;
/** This class contains reusable utility methods for unit tests. */
public class CommonTestUtils {
- /**
- * Creates a copy of an object via Java Serialization.
- *
- * @param original The original object.
- * @return The copied object.
- */
- public static <T extends java.io.Serializable> T createCopySerializable(T original)
- throws IOException {
- if (original == null) {
- throw new IllegalArgumentException();
- }
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(original);
- oos.close();
- baos.close();
-
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-
- try (ObjectInputStream ois = new ObjectInputStream(bais)) {
- @SuppressWarnings("unchecked")
- T copy = (T) ois.readObject();
- return copy;
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
- }
-
- /**
- * Creates a temporary file that contains the given string. The file is written with the
- * platform's default encoding.
- *
- * <p>The temp file is automatically deleted on JVM exit.
- *
- * @param contents The contents to be written to the file.
- * @return The temp file URI.
- */
- public static String createTempFile(String contents) throws IOException {
- File f = File.createTempFile("flink_test_", ".tmp");
- f.deleteOnExit();
-
- try (BufferedWriter out = new BufferedWriter(new FileWriter(f))) {
- out.write(contents);
- }
- return f.toURI().toString();
- }
-
- /**
- * Permanently blocks the current thread. The thread cannot be woken up via {@link
- * Thread#interrupt()}.
- */
- public static void blockForeverNonInterruptibly() {
- final Object lock = new Object();
- //noinspection InfiniteLoopStatement
- while (true) {
- try {
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (lock) {
- lock.wait();
- }
- } catch (InterruptedException ignored) {
- }
- }
- }
-
// ------------------------------------------------------------------------
// Manipulation of environment
// ------------------------------------------------------------------------
@@ -133,6 +59,7 @@ public class CommonTestUtils {
// only for Windows
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
try {
+ @SuppressWarnings("JavaReflectionMemberAccess")
Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
theCaseInsensitiveEnvironmentField.setAccessible(true);
@@ -150,28 +77,6 @@ public class CommonTestUtils {
}
}
- /**
- * Checks whether the given throwable contains the given cause as a cause. The cause is not
- * checked on equality but on type equality.
- *
- * @param throwable Throwable to check for the cause
- * @param cause Cause to look for
- * @return True if the given Throwable contains the given cause (type equality); otherwise false
- */
- public static boolean containsCause(Throwable throwable, Class<? extends Throwable> cause) {
- Throwable current = throwable;
-
- while (current != null) {
- if (cause.isAssignableFrom(current.getClass())) {
- return true;
- }
-
- current = current.getCause();
- }
-
- return false;
- }
-
/** Checks whether an exception with a message occurs when running a piece of code. */
public static void assertThrows(
String msg, Class<? extends Exception> expected, Callable<?> code) {
@@ -213,19 +118,4 @@ public class CommonTestUtils {
throw new TimeoutException(errorMsg);
}
}
-
- /**
- * Wait util the given condition is met or timeout.
- *
- * @param condition the condition to wait for.
- * @param timeout the maximum time to wait for the condition to become true.
- * @param errorMsg the error message to include in the <code>TimeoutException</code> if the
- * condition was not met before timeout.
- * @throws TimeoutException if the condition is not met before timeout.
- * @throws InterruptedException if the thread is interrupted.
- */
- public static void waitUtil(Supplier<Boolean> condition, Duration timeout, String errorMsg)
- throws TimeoutException, InterruptedException {
- waitUtil(condition, timeout, Duration.ofMillis(1), errorMsg);
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java b/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
index 13d221e7c..e815435ee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
@@ -23,7 +23,7 @@ import org.apache.paimon.options.description.InlineElement;
import static org.apache.paimon.options.description.TextElement.text;
-/** Defines the write mode for flink paimon. */
+/** Defines the write mode for paimon. */
public enum WriteMode implements DescribedEnum {
APPEND_ONLY(
"append-only",
diff --git a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
index 32e35bac5..10b831da3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
@@ -20,7 +20,6 @@ package org.apache.paimon.casting;
/**
* Interface to model a function that performs the casting of a value from one type to another.
- * Copied from flink.
*
* @param <IN> Input internal type
* @param <OUT> Output internal type
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
index 5d1abea41..e0ecaa68c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
@@ -21,6 +21,7 @@ package org.apache.paimon.disk;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockDecompressor;
import org.apache.paimon.data.AbstractPagedInputView;
+import org.apache.paimon.io.DataInputView;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
@@ -30,10 +31,10 @@ import java.util.Collections;
import java.util.List;
/**
- * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a {@link
- * BufferFileReader}, making it effectively a data input stream. The view reads it data in blocks
- * from the underlying channel and decompress it before returning to caller. The view can only read
- * data that has been written by {@link ChannelWriterOutputView}, due to block formatting.
+ * A {@link DataInputView} that is backed by a {@link BufferFileReader}, making it effectively a
+ * data input stream. The view reads it data in blocks from the underlying channel and decompress it
+ * before returning to caller. The view can only read data that has been written by {@link
+ * ChannelWriterOutputView}, due to block formatting.
*/
public class ChannelReaderInputView extends AbstractPagedInputView {
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
index f157e03ab..41089e197 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
@@ -21,15 +21,16 @@ package org.apache.paimon.disk;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockCompressor;
import org.apache.paimon.data.AbstractPagedOutputView;
+import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import java.io.IOException;
/**
- * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a {@link FileIOChannel},
- * making it effectively a data output stream. The view will compress its data before writing it in
- * blocks to the underlying channel.
+ * A {@link DataOutputView} that is backed by a {@link FileIOChannel}, making it effectively a data
+ * output stream. The view will compress its data before writing it in blocks to the underlying
+ * channel.
*/
public final class ChannelWriterOutputView extends AbstractPagedOutputView {
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
index 8359a14af..d78a361c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
@@ -66,7 +66,7 @@ public class FileChannelManagerImpl implements FileChannelManager {
File[] files = new File[tempDirs.length];
for (int i = 0; i < tempDirs.length; i++) {
File baseDir = new File(tempDirs[i]);
- String subfolder = String.format("flink-%s-%s", prefix, UUID.randomUUID());
+ String subfolder = String.format("paimon-%s-%s", prefix, UUID.randomUUID());
File storageDir = new File(baseDir, subfolder);
if (!storageDir.exists() && !storageDir.mkdirs()) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalMerger.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalMerger.java
index 75dab68c2..08f63d7de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalMerger.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalMerger.java
@@ -33,7 +33,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-/** Record merger for sort of BinaryRow. Copied from Flink. */
+/** Record merger for sort of BinaryRow. */
public class BinaryExternalMerger extends AbstractBinaryExternalMerger<BinaryRow> {
private final BinaryRowSerializer serializer;
diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
index 88c632f90..ec52a32e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
@@ -36,16 +36,11 @@ import java.util.ArrayList;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
- * In memory sort buffer for binary row. The main code is copied from Flink {@code
- * BinaryInMemorySortBuffer} instead of extended because it's a final class.
- *
- * <p>The main differences in the new sort buffer are:
+ * In memory sort buffer for binary row.
*
* <ul>
- * <li>1. Add clear method to clean all memory.
- * <li>2. Add tryInitialized() method to initialize memory before write and read in buffer, while
- * the old buffer will do it in the constructor and reset().
- * <li>3. Remove reset() and etc. methods which are not used in flink paimon.
+ * <li>{@link #clear}: Clean all memory.
+ * <li>{@link #tryInitialize}: initialize memory before write and read in buffer.
* </ul>
*/
public class BinaryInMemorySortBuffer extends BinaryIndexedSortable implements SortBuffer {
@@ -127,11 +122,6 @@ public class BinaryInMemorySortBuffer extends BinaryIndexedSortable implements S
}
}
- /**
- * We add clear() method here instead of reset() to release all memory segments. The reset()
- * method in flink sort buffer will clear memory and grab two buffers for
- * currentSortIndexSegment and recordCollector.
- */
@Override
public void clear() {
if (this.isInitialized) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryIndexedSortable.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryIndexedSortable.java
index 202dc0dce..68e907d44 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryIndexedSortable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryIndexedSortable.java
@@ -33,7 +33,6 @@ import java.util.ArrayList;
/**
* An abstract sortable, provide basic compare and swap. Support writing of index and normalizedKey.
- * Copied from Flink.
*/
public abstract class BinaryIndexedSortable implements IndexedSortable {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
index d4b2261ea..4a4dd636a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
@@ -26,9 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
- * A thread factory intended for use by critical thread pools. Critical thread pools here mean
- * thread pools that support Flink's core coordination and processing work, and which must not
- * simply cause unnoticed errors.
+ * A thread factory intended for use by critical thread pools.
*
* <p>The thread factory can be given an {@link Thread.UncaughtExceptionHandler} for the threads. If
* no handler is explicitly given, the default handler for uncaught exceptions will log the
@@ -42,9 +40,6 @@ import static org.apache.paimon.utils.Preconditions.checkNotNull;
*/
public class ExecutorThreadFactory implements ThreadFactory {
- /** The thread pool name used when no explicit pool name has been specified. */
- private static final String DEFAULT_POOL_NAME = "flink-executor-pool";
-
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
@@ -57,14 +52,6 @@ public class ExecutorThreadFactory implements ThreadFactory {
// ------------------------------------------------------------------------
- /**
- * Creates a new thread factory using the default thread pool name ('flink-executor-pool') and
- * the default uncaught exception handler (log exception and kill process).
- */
- public ExecutorThreadFactory() {
- this(DEFAULT_POOL_NAME);
- }
-
/**
* Creates a new thread factory using the given thread pool name and the default uncaught
* exception handler (log exception and kill process).
diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/FieldStatsCollectorTest.java b/paimon-core/src/test/java/org/apache/paimon/stats/FieldStatsCollectorTest.java
index 6d3d9f2b2..111d738c2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/stats/FieldStatsCollectorTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/stats/FieldStatsCollectorTest.java
@@ -43,14 +43,14 @@ public class FieldStatsCollectorTest {
collector.collect(
GenericRow.of(
- 1, BinaryString.fromString("Flink"), new GenericArray(new int[] {1, 10})));
+ 1, BinaryString.fromString("Paimon"), new GenericArray(new int[] {1, 10})));
assertThat(collector.extract())
.isEqualTo(
new FieldStats[] {
new FieldStats(1, 1, 0L),
new FieldStats(
- BinaryString.fromString("Flink"),
- BinaryString.fromString("Flink"),
+ BinaryString.fromString("Paimon"),
+ BinaryString.fromString("Paimon"),
0L),
new FieldStats(null, null, 0L)
});
@@ -61,8 +61,8 @@ public class FieldStatsCollectorTest {
new FieldStats[] {
new FieldStats(1, 3, 0L),
new FieldStats(
- BinaryString.fromString("Flink"),
- BinaryString.fromString("Flink"),
+ BinaryString.fromString("Paimon"),
+ BinaryString.fromString("Paimon"),
1L),
new FieldStats(null, null, 0L)
});
@@ -78,7 +78,7 @@ public class FieldStatsCollectorTest {
new FieldStats(1, 3, 1L),
new FieldStats(
BinaryString.fromString("Apache"),
- BinaryString.fromString("Flink"),
+ BinaryString.fromString("Paimon"),
1L),
new FieldStats(null, null, 0L)
});
@@ -90,7 +90,7 @@ public class FieldStatsCollectorTest {
new FieldStats(1, 3, 1L),
new FieldStats(
BinaryString.fromString("Apache"),
- BinaryString.fromString("Flink"),
+ BinaryString.fromString("Paimon"),
1L),
new FieldStats(null, null, 1L)
});
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java b/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
index 022b03eef..0c45cedb7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
@@ -84,7 +84,7 @@ public class FailingFileIO extends TraceableFileIO {
throw new FileNotFoundException(
"File "
+ path
- + " does not exist or the user running Flink ('"
+ + " does not exist or the user running Paimon ('"
+ System.getProperty("user.name")
+ "') has insufficient permissions to access it.");
}
diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
index 82e3cfdc1..6ecc76da3 100644
--- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
+++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
@@ -44,8 +44,8 @@ public class OSSFileIO extends HadoopCompliantFileIO {
private static final Logger LOG = LoggerFactory.getLogger(OSSFileIO.class);
/**
- * In order to simplify, we make flink oss configuration keys same with hadoop oss module. So,
- * we add all configuration key with prefix `fs.oss` in flink conf to hadoop conf.
+ * In order to simplify, we make paimon oss configuration keys same with hadoop oss module. So,
+ * we add all configuration key with prefix `fs.oss` in paimon conf to hadoop conf.
*/
private static final String[] CONFIG_PREFIXES = {"fs.oss."};
diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
index 8caf3b574..a58aa18ea 100644
--- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
+++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
@@ -68,11 +68,11 @@ public class S3FileIO extends HadoopCompliantFileIO {
@Override
public void configure(CatalogContext context) {
- this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromFlink(context));
+ this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context));
}
// add additional config entries from the IO config to the Hadoop config
- private Options loadHadoopConfigFromFlink(CatalogContext context) {
+ private Options loadHadoopConfigFromContext(CatalogContext context) {
Options hadoopConfig = new Options();
for (String key : context.options().keySet()) {
for (String prefix : CONFIG_PREFIXES) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
index 622978b08..2ce1f7c18 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
@@ -39,11 +39,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
-/**
- * Provides a {@link FormatReaderFactory} for Avro records.
- *
- * <p>NOTE: Copied from Flink.
- */
+/** Provides a {@link FormatReaderFactory} for Avro records. */
public abstract class AbstractAvroBulkFormat<A> implements FormatReaderFactory {
private static final long serialVersionUID = 1L;
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index 05571e314..ee4cff70f 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -49,7 +49,7 @@ import java.util.function.Function;
import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
-/** Avro {@link FileFormat}. The main code is copied from Flink {@code AvroFileFormatFactory}. */
+/** Avro {@link FileFormat}. */
public class AvroFileFormat extends FileFormat {
public static final String IDENTIFIER = "avro";
@@ -74,8 +74,6 @@ public class AvroFileFormat extends FileFormat {
// if the schema given to the reader is not equal to the schema in header,
// reader will automatically map the fields and give back records with our desired
// schema
- //
- // for detailed discussion see comments in https://github.com/apache/flink/pull/18657
DataType producedType = Projection.of(projection).project(type);
return new AvroGenericRecordBulkFormat((RowType) producedType.copy(false));
}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
index 99dcfe4bc..86fc18110 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
@@ -35,7 +35,7 @@ import org.apache.avro.SchemaBuilder;
import java.util.List;
-/** Converts an Avro schema into Flink's type information. */
+/** Converts an Avro schema into Paimon's type information. */
public class AvroSchemaConverter {
private AvroSchemaConverter() {
@@ -43,20 +43,20 @@ public class AvroSchemaConverter {
}
/**
- * Converts Flink SQL {@link DataType} (can be nested) into an Avro schema.
+ * Converts Paimon {@link DataType} (can be nested) into an Avro schema.
*
- * <p>Use "org.apache.flink.avro.generated.record" as the type name.
+ * <p>Use "org.apache.paimon.avro.generated.record" as the type name.
*
* @param schema the schema type, usually it should be the top level record type, e.g. not a
* nested type
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(DataType schema) {
- return convertToSchema(schema, "org.apache.flink.avro.generated.record");
+ return convertToSchema(schema, "org.apache.paimon.avro.generated.record");
}
/**
- * Converts Flink SQL {@link DataType} (can be nested) into an Avro schema.
+ * Converts Paimon {@link DataType} (can be nested) into an Avro schema.
*
* <p>The "{rowName}_" is used as the nested row type name prefix in order to generate the right
* schema. Nested record type that only differs with type name is still compatible.
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroToRowDataConverters.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroToRowDataConverters.java
index 3ccb9166a..cf15f4f41 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroToRowDataConverters.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroToRowDataConverters.java
@@ -53,8 +53,8 @@ import static org.apache.paimon.format.avro.AvroSchemaConverter.extractValueType
public class AvroToRowDataConverters {
/**
- * Runtime converter that converts Avro data structures into objects of Flink Table & SQL
- * internal data structures.
+ * Runtime converter that converts Avro data structures into objects of Paimon internal data
+ * structures.
*/
@FunctionalInterface
public interface AvroToRowDataConverter extends Serializable {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
index 7aca7fde6..5dd08e413 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
@@ -25,8 +25,7 @@ import org.apache.avro.file.DataFileWriter;
import java.io.IOException;
/**
- * A factory that creates an {@link AvroBulkWriter}. The factory takes a user-supplied builder to
- * assemble Parquet's writer and then turns it into a Flink {@code BulkWriter}.
+ * A factory that creates an {@link AvroBulkWriter}.
*
* @param <T> The type of record to write.
*/
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
index d23d97a19..a89e8d070 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
@@ -50,8 +50,8 @@ public class RowDataToAvroConverters {
// --------------------------------------------------------------------------------
/**
- * Runtime converter that converts objects of Flink Table & SQL internal data structures to
- * corresponding Avro data structures.
+ * Runtime converter that converts objects of Paimon internal data structures to corresponding
+ * Avro data structures.
*/
@FunctionalInterface
public interface RowDataToAvroConverter extends Serializable {
@@ -67,7 +67,7 @@ public class RowDataToAvroConverters {
/**
* Creates a runtime converter according to the given logical type that converts objects of
- * Flink Table & SQL internal data structures to corresponding Avro data structures.
+ * Paimon internal data structures to corresponding Avro data structures.
*/
public static RowDataToAvroConverter createConverter(DataType type) {
final RowDataToAvroConverter converter;
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
index f1979fa38..1dcc0da00 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.fs;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.utils.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -52,7 +53,7 @@ public class HadoopReadOnlyFileSystem extends FileSystem {
@Override
public FSDataInputStream open(Path path) throws IOException {
return new FSDataInputStream(
- new FSDataWrappedInputStream(fileIO.newInputStream(toFlinkPath(path))));
+ new FSDataWrappedInputStream(fileIO.newInputStream(toPaimonPath(path))));
}
@Override
@@ -62,10 +63,10 @@ public class HadoopReadOnlyFileSystem extends FileSystem {
@Override
public FileStatus getFileStatus(Path path) throws IOException {
- return toHadoopStatus(fileIO.getFileStatus(toFlinkPath(path)));
+ return toHadoopStatus(fileIO.getFileStatus(toPaimonPath(path)));
}
- private static org.apache.paimon.fs.Path toFlinkPath(Path path) {
+ private static org.apache.paimon.fs.Path toPaimonPath(Path path) {
return new org.apache.paimon.fs.Path(path.toUri());
}
@@ -138,17 +139,13 @@ public class HadoopReadOnlyFileSystem extends FileSystem {
throw new UnsupportedOperationException();
}
- /**
- * A {@link InputStream} to wrap {@link org.apache.paimon.fs.SeekableInputStream} for Flink's
- * input streams.
- */
+ /** A {@link InputStream} to wrap {@link SeekableInputStream} for Paimon's input streams. */
private static class FSDataWrappedInputStream extends InputStream
implements Seekable, PositionedReadable {
- private final org.apache.paimon.fs.SeekableInputStream seekableInputStream;
+ private final SeekableInputStream seekableInputStream;
- private FSDataWrappedInputStream(
- org.apache.paimon.fs.SeekableInputStream seekableInputStream) {
+ private FSDataWrappedInputStream(SeekableInputStream seekableInputStream) {
this.seekableInputStream = seekableInputStream;
}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 7b3bfb6e9..5c99d4058 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -55,7 +55,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.types.DataTypeChecks.getFieldTypes;
-/** Orc {@link FileFormat}. The main code is copied from Flink {@code OrcFileFormatFactory}. */
+/** Orc {@link FileFormat}. */
public class OrcFileFormat extends FileFormat {
public static final String IDENTIFIER = "orc";
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 12de77b24..b5267fd7b 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -48,7 +48,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
-import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createFlinkVector;
+import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -123,10 +123,9 @@ public class OrcReaderFactory implements FormatReaderFactory {
for (int i = 0; i < vectors.length; i++) {
String name = tableFieldNames.get(selectedFields[i]);
DataType type = tableFieldTypes.get(selectedFields[i]);
- vectors[i] = createFlinkVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
+ vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
}
- VectorizedColumnBatch flinkColumnBatch = new VectorizedColumnBatch(vectors);
- return new OrcReaderBatch(orcBatch, flinkColumnBatch, recycler);
+ return new OrcReaderBatch(orcBatch, new VectorizedColumnBatch(vectors), recycler);
}
// ------------------------------------------------------------------------
@@ -150,17 +149,18 @@ public class OrcReaderFactory implements FormatReaderFactory {
private final VectorizedRowBatch orcVectorizedRowBatch;
private final Pool.Recycler<OrcReaderBatch> recycler;
- private final VectorizedColumnBatch flinkColumnBatch;
+ private final VectorizedColumnBatch paimonColumnBatch;
private final ColumnarRowIterator result;
protected OrcReaderBatch(
final VectorizedRowBatch orcVectorizedRowBatch,
- final VectorizedColumnBatch flinkColumnBatch,
+ final VectorizedColumnBatch paimonColumnBatch,
final Pool.Recycler<OrcReaderBatch> recycler) {
this.orcVectorizedRowBatch = checkNotNull(orcVectorizedRowBatch);
this.recycler = checkNotNull(recycler);
- this.flinkColumnBatch = flinkColumnBatch;
- this.result = new ColumnarRowIterator(new ColumnarRow(flinkColumnBatch), this::recycle);
+ this.paimonColumnBatch = paimonColumnBatch;
+ this.result =
+ new ColumnarRowIterator(new ColumnarRow(paimonColumnBatch), this::recycle);
}
/**
@@ -177,10 +177,10 @@ public class OrcReaderFactory implements FormatReaderFactory {
}
private RecordIterator<InternalRow> convertAndGetIterator(VectorizedRowBatch orcBatch) {
- // no copying from the ORC column vectors to the Flink columns vectors necessary,
+ // no copying from the ORC column vectors to the Paimon columns vectors necessary,
// because they point to the same data arrays internally design
int batchSize = orcBatch.size;
- flinkColumnBatch.setNumRows(batchSize);
+ paimonColumnBatch.setNumRows(batchSize);
result.set(batchSize);
return result;
}
@@ -350,7 +350,7 @@ public class OrcReaderFactory implements FormatReaderFactory {
OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
- // configure filesystem from Flink filesystem
+ // configure filesystem from Paimon FileIO
readerOptions.filesystem(new HadoopReadOnlyFileSystem(fileIO));
return OrcFile.createReader(hPath, readerOptions);
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/ThreadLocalClassLoaderConfiguration.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/ThreadLocalClassLoaderConfiguration.java
index d03845896..4ce5229c4 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/ThreadLocalClassLoaderConfiguration.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/ThreadLocalClassLoaderConfiguration.java
@@ -24,8 +24,8 @@ import java.net.URL;
/**
* Workaround for https://issues.apache.org/jira/browse/ORC-653.
*
- * <p>Since the conf is effectively cached across Flink jobs, at least force the thread local
- * classloader to avoid classloader leaks.
+ * <p>Since the conf is effectively cached across jobs, at least force the thread local classloader
+ * to avoid classloader leaks.
*/
public final class ThreadLocalClassLoaderConfiguration extends Configuration {
public ThreadLocalClassLoaderConfiguration() {}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
index c887c4135..21154c496 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-/** This column vector is used to adapt hive's ColumnVector to Flink's ColumnVector. */
+/** This column vector is used to adapt hive's ColumnVector to Paimon's ColumnVector. */
public abstract class AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.ColumnVector {
@@ -49,7 +49,7 @@ public abstract class AbstractOrcColumnVector
return !vector.noNulls && vector.isNull[vector.isRepeating ? 0 : i];
}
- public static org.apache.paimon.data.columnar.ColumnVector createFlinkVector(
+ public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector(
ColumnVector vector, DataType dataType) {
if (vector instanceof LongColumnVector) {
if (dataType.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java
index df9944975..fd075417c 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java
@@ -25,23 +25,23 @@ import org.apache.paimon.types.ArrayType;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-/** This column vector is used to adapt hive's ListColumnVector to Flink's ArrayColumnVector. */
+/** This column vector is used to adapt hive's ListColumnVector to Paimon's ArrayColumnVector. */
public class OrcArrayColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.ArrayColumnVector {
private final ListColumnVector hiveVector;
- private final ColumnVector flinkVector;
+ private final ColumnVector paimonVector;
public OrcArrayColumnVector(ListColumnVector hiveVector, ArrayType type) {
super(hiveVector);
this.hiveVector = hiveVector;
- this.flinkVector = createFlinkVector(hiveVector.child, type.getElementType());
+ this.paimonVector = createPaimonVector(hiveVector.child, type.getElementType());
}
@Override
public InternalArray getArray(int i) {
long offset = hiveVector.offsets[i];
long length = hiveVector.lengths[i];
- return new ColumnarArray(flinkVector, (int) offset, (int) length);
+ return new ColumnarArray(paimonVector, (int) offset, (int) length);
}
}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java
index 739f0d53a..d48bad886 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java
@@ -20,7 +20,7 @@ package org.apache.paimon.format.orc.reader;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-/** This column vector is used to adapt hive's BytesColumnVector to Flink's BytesColumnVector. */
+/** This column vector is used to adapt hive's BytesColumnVector to Paimon's BytesColumnVector. */
public class OrcBytesColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.BytesColumnVector {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java
index f71091b54..9ea4d763a 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import java.math.BigDecimal;
/**
- * This column vector is used to adapt hive's DecimalColumnVector to Flink's DecimalColumnVector.
+ * This column vector is used to adapt hive's DecimalColumnVector to Paimon's DecimalColumnVector.
*/
public class OrcDecimalColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.DecimalColumnVector {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java
index ed8d60df9..0c0b0cc51 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.orc.reader;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
/**
- * This column vector is used to adapt hive's DoubleColumnVector to Flink's float and double
+ * This column vector is used to adapt hive's DoubleColumnVector to Paimon's float and double
* ColumnVector.
*/
public class OrcDoubleColumnVector extends AbstractOrcColumnVector
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java
index 3e49b20aa..e7dfe0e61 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.orc.reader;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
/**
- * This column vector is used to adapt hive's LongColumnVector to Flink's boolean, byte, short, int
+ * This column vector is used to adapt hive's LongColumnVector to Paimon's boolean, byte, short, int
* and long ColumnVector.
*/
public class OrcLongColumnVector extends AbstractOrcColumnVector
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java
index dc937f32d..9b6025142 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java
@@ -25,25 +25,25 @@ import org.apache.paimon.types.MapType;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-/** This column vector is used to adapt hive's MapColumnVector to Flink's MapColumnVector. */
+/** This column vector is used to adapt hive's MapColumnVector to Paimon's MapColumnVector. */
public class OrcMapColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.MapColumnVector {
private final MapColumnVector hiveVector;
- private final ColumnVector keyFlinkVector;
- private final ColumnVector valueFlinkVector;
+ private final ColumnVector keyPaimonVector;
+ private final ColumnVector valuePaimonVector;
public OrcMapColumnVector(MapColumnVector hiveVector, MapType type) {
super(hiveVector);
this.hiveVector = hiveVector;
- this.keyFlinkVector = createFlinkVector(hiveVector.keys, type.getKeyType());
- this.valueFlinkVector = createFlinkVector(hiveVector.values, type.getValueType());
+ this.keyPaimonVector = createPaimonVector(hiveVector.keys, type.getKeyType());
+ this.valuePaimonVector = createPaimonVector(hiveVector.values, type.getValueType());
}
@Override
public InternalMap getMap(int i) {
long offset = hiveVector.offsets[i];
long length = hiveVector.lengths[i];
- return new ColumnarMap(keyFlinkVector, valueFlinkVector, (int) offset, (int) length);
+ return new ColumnarMap(keyPaimonVector, valuePaimonVector, (int) offset, (int) length);
}
}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
index f9c2388eb..698655e8a 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
@@ -34,11 +34,11 @@ public class OrcRowColumnVector extends AbstractOrcColumnVector
public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) {
super(hiveVector);
int len = hiveVector.fields.length;
- ColumnVector[] flinkVectors = new ColumnVector[len];
+ ColumnVector[] paimonVectors = new ColumnVector[len];
for (int i = 0; i < len; i++) {
- flinkVectors[i] = createFlinkVector(hiveVector.fields[i], type.getTypeAt(i));
+ paimonVectors[i] = createPaimonVector(hiveVector.fields[i], type.getTypeAt(i));
}
- this.columnarRow = new ColumnarRow(new VectorizedColumnBatch(flinkVectors));
+ this.columnarRow = new ColumnarRow(new VectorizedColumnBatch(paimonVectors));
}
@Override
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java
index 48a692ee8..85d2cfc7c 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java
@@ -32,7 +32,6 @@ import org.apache.orc.TypeDescription;
/** Util for orc types. */
public class OrcSplitReaderUtil {
- /** See {@code org.apache.flink.table.catalog.hive.util.HiveTypeUtil}. */
public static TypeDescription toOrcType(DataType type) {
type = type.copy(true);
switch (type.getTypeRoot()) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
index 70c5ab58c..1da7232fc 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
/**
- * This column vector is used to adapt hive's TimestampColumnVector to Flink's
+ * This column vector is used to adapt hive's TimestampColumnVector to Paimon's
* TimestampColumnVector.
*/
public class OrcTimestampColumnVector extends AbstractOrcColumnVector
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
index f915b469c..20112604e 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
@@ -47,7 +47,7 @@ import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
* A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
*
* <p>Whereas PhysicalFsWriter implementation works on the basis of a Path, this implementation
- * leverages Flink's {@link PositionOutputStream} to write the compressed data.
+ * leverages Paimon's {@link PositionOutputStream} to write the compressed data.
*
* <p>NOTE: If the ORC dependency version is updated, this file may have to be updated as well to be
* in sync with the new version's PhysicalFsWriter.
@@ -222,7 +222,7 @@ public class PhysicalWriterImpl implements PhysicalWriter {
@Override
public void close() {
// Just release the codec but don't close the internal stream here to avoid
- // Stream Closed or ClosedChannelException when Flink performs checkpoint.
+ // Stream Closed or ClosedChannelException when performs checkpoint.
OrcCodecPool.returnCodec(compress, codec);
codec = null;
}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 854905358..24ed57c33 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -151,7 +151,7 @@ public class ParquetReaderFactory implements FormatReaderFactory {
}
}
- return Types.buildMessage().addFields(types).named("flink-parquet");
+ return Types.buildMessage().addFields(types).named("paimon-parquet");
}
private void checkSchema(MessageType fileSchema, MessageType requestedSchema)
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index a8fbb6704..4ebc3e1bf 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -44,7 +44,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LE
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-/** Schema converter converts Parquet schema to and from Flink internal types. */
+/** Schema converter converts Parquet schema to and from Paimon internal types. */
public class ParquetSchemaConverter {
static final String MAP_REPEATED_NAME = "key_value";
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
index fb6f302b4..e91f9096e 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
@@ -31,10 +31,7 @@ import org.apache.parquet.io.OutputFile;
import java.io.IOException;
-/**
- * A factory that creates a Parquet {@link FormatWriter}. The factory takes a user-supplied builder
- * to assemble Parquet's writer and then turns it into a Flink {@code BulkWriter}.
- */
+/** A factory that creates a Parquet {@link FormatWriter}. */
public class ParquetWriterFactory implements FormatWriterFactory {
/** The builder to construct the ParquetWriter. */
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java
index a88a692dc..057cc7c32 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java
@@ -55,7 +55,7 @@ public class ParquetRowDataBuilder
private class ParquetWriteSupport extends WriteSupport<InternalRow> {
- private final MessageType schema = convertToParquetMessageType("flink_schema", rowType);
+ private final MessageType schema = convertToParquetMessageType("paimon_schema", rowType);
private ParquetRowDataWriter writer;
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/PositionOutputStreamAdapter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/PositionOutputStreamAdapter.java
index a4558b10a..72fda99e4 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/PositionOutputStreamAdapter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/PositionOutputStreamAdapter.java
@@ -24,16 +24,16 @@ import java.io.IOException;
import static org.apache.parquet.Preconditions.checkNotNull;
-/** An adapter to turn Flink's {@link PositionOutputStream} into a {@link PositionOutputStream}. */
+/** An adapter to turn Paimon's {@link PositionOutputStream} into a {@link PositionOutputStream}. */
class PositionOutputStreamAdapter extends PositionOutputStream {
- /** The Flink stream written to. */
+ /** The Paimon stream written to. */
private final org.apache.paimon.fs.PositionOutputStream out;
/**
* Create a new PositionOutputStreamAdapter.
*
- * @param out The Flink stream written to.
+ * @param out The Paimon stream written to.
*/
PositionOutputStreamAdapter(org.apache.paimon.fs.PositionOutputStream out) {
this.out = checkNotNull(out, "out");
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/StreamOutputFile.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/StreamOutputFile.java
index 975f646e2..b7ab0bc52 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/StreamOutputFile.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/StreamOutputFile.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
- * An implementation of Parquet's {@link OutputFile} interface that goes against a Flink {@link
+ * An implementation of Parquet's {@link OutputFile} interface that goes against a Paimon {@link
* PositionOutputStream}.
*
* <p>Because the implementation goes against an open stream, rather than open its own streams
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
index 389611de1..054a3369b 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
@@ -56,7 +56,7 @@ class AvroBulkFormatTest {
BinaryString.fromString("AvroBulk"),
BinaryString.fromString("FormatTest")),
GenericRow.of(
- BinaryString.fromString("Apache"), BinaryString.fromString("Flink")),
+ BinaryString.fromString("Apache"), BinaryString.fromString("Paimon")),
GenericRow.of(
BinaryString.fromString(
"永和九年,岁在癸丑,暮春之初,会于会稽山阴之兰亭,修禊事也。群贤毕至,少"
@@ -77,7 +77,7 @@ class AvroBulkFormatTest {
BinaryString.fromString("only one record"))
// -------- file length 752 --------
);
- private static final List<Long> BLOCK_STARTS = Arrays.asList(232L, 593L, 705L);
+ private static final List<Long> BLOCK_STARTS = Arrays.asList(233L, 595L, 707L);
private File tmpFile;
diff --git a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
index dec145e6f..b33168e25 100644
--- a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
+++ b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
@@ -28,7 +28,7 @@ import org.apache.paimon.types.VarCharType;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-/** Utils for converting types related classes between Flink and Hive. */
+/** Utils for converting types related classes between Paimon and Hive. */
public class HiveTypeUtils {
public static TypeInfo logicalTypeToTypeInfo(DataType logicalType) {
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index 5c0f54eff..a50eca546 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -74,7 +74,7 @@ public class HiveSchema {
throw new UnsupportedOperationException(
"Location property is missing for table "
+ tableName
- + ". Currently Flink paimon only supports external table for Hive "
+ + ". Currently Paimon only supports external table for Hive "
+ "so location property must be set.");
}
Path path = new Path(location);
@@ -152,7 +152,7 @@ public class HiveSchema {
throw new IllegalArgumentException(
"Hive DDL and paimon schema mismatched! "
+ "It is recommended not to write any column definition "
- + "as Flink paimon external table can read schema from the specified location.\n"
+ + "as Paimon external table can read schema from the specified location.\n"
+ "Mismatched fields are:\n"
+ String.join("--------------------\n", mismatched));
}
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
index 703595096..6259fcb71 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
@@ -36,7 +36,7 @@ public class PaimonOutputFormat implements OutputFormat<InternalRow, InternalRow
FileSystem fileSystem, JobConf jobConf, String s, Progressable progressable)
throws IOException {
throw new UnsupportedOperationException(
- "Flink paimon currently can only be used as an input format for Hive.");
+ "Paimon currently can only be used as an input format for Hive.");
}
@Override
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
index 56e880c4c..f11ef4990 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
@@ -37,7 +37,7 @@ public class PaimonDateObjectInspector extends AbstractPrimitiveJavaObjectInspec
@Override
public Date getPrimitiveJavaObject(Object o) {
- // Flink stores date as an integer (epoch day, 1970-01-01 = day 0)
+ // Paimon stores date as an integer (epoch day, 1970-01-01 = day 0)
// while constructor of Date accepts epoch millis
return o == null ? null : DateTimeUtils.toSQLDate((Integer) o);
}
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
index c3e9e1db3..3e9a63620 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
@@ -116,7 +116,7 @@ public class HiveTableSchemaTest {
"\n",
"Hive DDL and paimon schema mismatched! "
+ "It is recommended not to write any column definition "
- + "as Flink paimon external table can read schema from the specified location.",
+ + "as Paimon external table can read schema from the specified location.",
"Mismatched fields are:",
"Field #1",
"Hive DDL : mismatched string",
@@ -145,7 +145,7 @@ public class HiveTableSchemaTest {
"\n",
"Hive DDL and paimon schema mismatched! "
+ "It is recommended not to write any column definition "
- + "as Flink paimon external table can read schema from the specified location.",
+ + "as Paimon external table can read schema from the specified location.",
"Mismatched fields are:",
"Field #1",
"Hive DDL : null",
@@ -183,7 +183,7 @@ public class HiveTableSchemaTest {
"\n",
"Hive DDL and paimon schema mismatched! "
+ "It is recommended not to write any column definition "
- + "as Flink paimon external table can read schema from the specified location.",
+ + "as Paimon external table can read schema from the specified location.",
"Mismatched fields are:",
"Field #3",
"Hive DDL : d int",
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index 7cf939350..04c8a1d68 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -777,7 +777,7 @@ public class PaimonStorageHandlerITCase {
Collections.emptyList(),
Collections.emptyList());
- // TODO add NaN related tests after FLINK-27627 and FLINK-27628 are fixed
+ // TODO add NaN related tests
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
diff --git a/paimon-spark/paimon-spark-2/src/main/java/org/apache/paimon/spark/SparkDataSourceReader.java b/paimon-spark/paimon-spark-2/src/main/java/org/apache/paimon/spark/SparkDataSourceReader.java
index 6d35dfdfd..388b0c688 100644
--- a/paimon-spark/paimon-spark-2/src/main/java/org/apache/paimon/spark/SparkDataSourceReader.java
+++ b/paimon-spark/paimon-spark-2/src/main/java/org/apache/paimon/spark/SparkDataSourceReader.java
@@ -117,7 +117,7 @@ public class SparkDataSourceReader
@Override
public StructType readSchema() {
RowType rowType = table.rowType();
- return SparkTypeUtils.fromFlinkRowType(
+ return SparkTypeUtils.fromPaimonRowType(
projectedFields == null ? rowType : TypeUtils.project(rowType, projectedFields));
}
diff --git a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
index 26a89c4ec..8d9ac48f7 100644
--- a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
+++ b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
@@ -84,7 +84,7 @@ public class SparkInternalRowTest {
Function1<Object, Object> sparkConverter =
CatalystTypeConverters.createToScalaConverter(
- SparkTypeUtils.fromFlinkType(ALL_TYPES));
+ SparkTypeUtils.fromPaimonType(ALL_TYPES));
org.apache.spark.sql.Row sparkRow =
(org.apache.spark.sql.Row)
sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(rowData));
diff --git a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 784957610..b374966a8 100644
--- a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -62,7 +62,7 @@ public class SparkReadITCase {
Path warehousePath = new Path("file:" + warehouse);
spark = SparkSession.builder().master("local[2]").getOrCreate();
- // Flink sink
+ // Paimon sink
tablePath1 = new Path(warehousePath, "default.db/t1");
SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
diff --git a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index 7220d2246..f5cbf8db9 100644
--- a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++ b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -24,7 +24,7 @@ import org.apache.paimon.types.RowType;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
-import static org.apache.paimon.spark.SparkTypeUtils.fromFlinkRowType;
+import static org.apache.paimon.spark.SparkTypeUtils.fromPaimonRowType;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link SparkTypeUtils}. */
@@ -90,7 +90,7 @@ public class SparkTypeTest {
+ "StructField(decimal2,DecimalType(38,2),true), "
+ "StructField(decimal3,DecimalType(10,1),true))";
- StructType sparkType = fromFlinkRowType(ALL_TYPES);
+ StructType sparkType = fromPaimonRowType(ALL_TYPES);
assertThat(sparkType.toString()).isEqualTo(expected);
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java
index 654661202..c99f857c9 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java
@@ -33,11 +33,11 @@ import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
-import static org.apache.paimon.spark.SparkInternalRow.fromFlink;
+import static org.apache.paimon.spark.SparkInternalRow.fromPaimon;
import static org.apache.paimon.utils.RowDataUtils.copyArray;
import static org.apache.paimon.utils.TypeUtils.timestampPrecision;
-/** Spark {@link ArrayData} to wrap flink {@code ArrayData}. */
+/** Spark {@link ArrayData} to wrap Paimon {@link InternalArray}. */
public class SparkArrayData extends ArrayData {
private final DataType elementType;
@@ -67,7 +67,7 @@ public class SparkArrayData extends ArrayData {
public Object[] array() {
Object[] objects = new Object[numElements()];
for (int i = 0; i < objects.length; i++) {
- objects[i] = fromFlink(RowDataUtils.get(array, i, elementType), elementType);
+ objects[i] = fromPaimon(RowDataUtils.get(array, i, elementType), elementType);
}
return objects;
}
@@ -117,7 +117,7 @@ public class SparkArrayData extends ArrayData {
}
private long getTimestampMicros(int ordinal) {
- return fromFlink(array.getTimestamp(ordinal, timestampPrecision(elementType)));
+ return fromPaimon(array.getTimestamp(ordinal, timestampPrecision(elementType)));
}
@Override
@@ -132,12 +132,12 @@ public class SparkArrayData extends ArrayData {
@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
- return fromFlink(array.getDecimal(ordinal, precision, scale));
+ return fromPaimon(array.getDecimal(ordinal, precision, scale));
}
@Override
public UTF8String getUTF8String(int ordinal) {
- return fromFlink(array.getString(ordinal));
+ return fromPaimon(array.getString(ordinal));
}
@Override
@@ -152,17 +152,17 @@ public class SparkArrayData extends ArrayData {
@Override
public InternalRow getStruct(int ordinal, int numFields) {
- return fromFlink(array.getRow(ordinal, numFields), (RowType) elementType);
+ return fromPaimon(array.getRow(ordinal, numFields), (RowType) elementType);
}
@Override
public ArrayData getArray(int ordinal) {
- return fromFlink(array.getArray(ordinal), (ArrayType) elementType);
+ return fromPaimon(array.getArray(ordinal), (ArrayType) elementType);
}
@Override
public MapData getMap(int ordinal) {
- return fromFlink(array.getMap(ordinal), elementType);
+ return fromPaimon(array.getMap(ordinal), elementType);
}
@Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 12e91ca97..9196bfef3 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -61,7 +61,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.paimon.spark.SparkTypeUtils.toFlinkType;
+import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog implements TableCatalog, SupportsNamespaces {
@@ -269,7 +269,7 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
SchemaChange.Move move = getMove(add.position(), add.fieldNames());
return SchemaChange.addColumn(
add.fieldNames()[0],
- toFlinkType(add.dataType()).copy(add.isNullable()),
+ toPaimonType(add.dataType()).copy(add.isNullable()),
add.comment(),
move);
} else if (change instanceof RenameColumn) {
@@ -284,7 +284,7 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
UpdateColumnType update = (UpdateColumnType) change;
validateAlterNestedField(update.fieldNames());
return SchemaChange.updateColumnType(
- update.fieldNames()[0], toFlinkType(update.newDataType()));
+ update.fieldNames()[0], toPaimonType(update.newDataType()));
} else if (change instanceof UpdateColumnNullability) {
UpdateColumnNullability update = (UpdateColumnNullability) change;
return SchemaChange.updateColumnNullability(update.fieldNames(), update.nullable());
@@ -334,7 +334,7 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
.map(String::trim)
.collect(Collectors.toList());
return new Schema(
- ((RowType) toFlinkType(schema)).getFields(),
+ ((RowType) toPaimonType(schema)).getFields(),
Arrays.stream(partitions)
.map(partition -> partition.references()[0].describe())
.collect(Collectors.toList()),
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
index c0caf6706..f330435a6 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
@@ -114,7 +114,7 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
private long getTimestampMicros(int ordinal) {
DataType type = rowType.getTypeAt(ordinal);
- return fromFlink(row.getTimestamp(ordinal, timestampPrecision(type)));
+ return fromPaimon(row.getTimestamp(ordinal, timestampPrecision(type)));
}
@Override
@@ -130,12 +130,12 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
org.apache.paimon.data.Decimal decimal = row.getDecimal(ordinal, precision, scale);
- return fromFlink(decimal);
+ return fromPaimon(decimal);
}
@Override
public UTF8String getUTF8String(int ordinal) {
- return fromFlink(row.getString(ordinal));
+ return fromPaimon(row.getString(ordinal));
}
@Override
@@ -150,17 +150,17 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
@Override
public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) {
- return fromFlink(row.getRow(ordinal, numFields), (RowType) rowType.getTypeAt(ordinal));
+ return fromPaimon(row.getRow(ordinal, numFields), (RowType) rowType.getTypeAt(ordinal));
}
@Override
public ArrayData getArray(int ordinal) {
- return fromFlink(row.getArray(ordinal), (ArrayType) rowType.getTypeAt(ordinal));
+ return fromPaimon(row.getArray(ordinal), (ArrayType) rowType.getTypeAt(ordinal));
}
@Override
public MapData getMap(int ordinal) {
- return fromFlink(row.getMap(ordinal), rowType.getTypeAt(ordinal));
+ return fromPaimon(row.getMap(ordinal), rowType.getTypeAt(ordinal));
}
@Override
@@ -168,57 +168,57 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
return SpecializedGettersReader.read(this, ordinal, dataType);
}
- public static Object fromFlink(Object o, DataType type) {
+ public static Object fromPaimon(Object o, DataType type) {
if (o == null) {
return null;
}
switch (type.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return fromFlink((Timestamp) o);
+ return fromPaimon((Timestamp) o);
case CHAR:
case VARCHAR:
- return fromFlink((BinaryString) o);
+ return fromPaimon((BinaryString) o);
case DECIMAL:
- return fromFlink((org.apache.paimon.data.Decimal) o);
+ return fromPaimon((org.apache.paimon.data.Decimal) o);
case ARRAY:
- return fromFlink((InternalArray) o, (ArrayType) type);
+ return fromPaimon((InternalArray) o, (ArrayType) type);
case MAP:
case MULTISET:
- return fromFlink((InternalMap) o, type);
+ return fromPaimon((InternalMap) o, type);
case ROW:
- return fromFlink((InternalRow) o, (RowType) type);
+ return fromPaimon((InternalRow) o, (RowType) type);
default:
return o;
}
}
- public static UTF8String fromFlink(BinaryString string) {
+ public static UTF8String fromPaimon(BinaryString string) {
return UTF8String.fromBytes(string.toBytes());
}
- public static Decimal fromFlink(org.apache.paimon.data.Decimal decimal) {
+ public static Decimal fromPaimon(org.apache.paimon.data.Decimal decimal) {
return Decimal.apply(decimal.toBigDecimal());
}
- public static org.apache.spark.sql.catalyst.InternalRow fromFlink(
+ public static org.apache.spark.sql.catalyst.InternalRow fromPaimon(
InternalRow row, RowType rowType) {
return new SparkInternalRow(rowType).replace(row);
}
- public static long fromFlink(Timestamp timestamp) {
+ public static long fromPaimon(Timestamp timestamp) {
return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp());
}
- public static ArrayData fromFlink(InternalArray array, ArrayType arrayType) {
- return fromFlinkArrayElementType(array, arrayType.getElementType());
+ public static ArrayData fromPaimon(InternalArray array, ArrayType arrayType) {
+ return fromPaimonArrayElementType(array, arrayType.getElementType());
}
- private static ArrayData fromFlinkArrayElementType(InternalArray array, DataType elementType) {
+ private static ArrayData fromPaimonArrayElementType(InternalArray array, DataType elementType) {
return new SparkArrayData(elementType).replace(array);
}
- public static MapData fromFlink(InternalMap map, DataType mapType) {
+ public static MapData fromPaimon(InternalMap map, DataType mapType) {
DataType keyType;
DataType valueType;
if (mapType instanceof MapType) {
@@ -232,7 +232,7 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
}
return new ArrayBasedMapData(
- fromFlinkArrayElementType(map.keyArray(), keyType),
- fromFlinkArrayElementType(map.valueArray(), valueType));
+ fromPaimonArrayElementType(map.keyArray(), keyType),
+ fromPaimonArrayElementType(map.valueArray(), valueType));
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index efd8f3ae5..1c4b1567c 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -94,7 +94,7 @@ public class SparkRow implements InternalRow {
@Override
public int getInt(int i) {
if (type.getTypeAt(i) instanceof DateType) {
- return toFlinkDate(row.get(i));
+ return toPaimonDate(row.get(i));
}
return row.getInt(i);
}
@@ -126,7 +126,7 @@ public class SparkRow implements InternalRow {
@Override
public Timestamp getTimestamp(int i, int precision) {
- return toFlinkTimestamp(row.get(i));
+ return toPaimonTimestamp(row.get(i));
}
@Override
@@ -136,12 +136,12 @@ public class SparkRow implements InternalRow {
@Override
public InternalArray getArray(int i) {
- return new FlinkArray(((ArrayType) type.getTypeAt(i)).getElementType(), row.getList(i));
+ return new PaimonArray(((ArrayType) type.getTypeAt(i)).getElementType(), row.getList(i));
}
@Override
public InternalMap getMap(int i) {
- return toFlinkMap((MapType) type.getTypeAt(i), row.getJavaMap(i));
+ return toPaimonMap((MapType) type.getTypeAt(i), row.getJavaMap(i));
}
@Override
@@ -149,7 +149,7 @@ public class SparkRow implements InternalRow {
return new SparkRow((RowType) type.getTypeAt(i), row.getStruct(i));
}
- private static int toFlinkDate(Object object) {
+ private static int toPaimonDate(Object object) {
if (object instanceof Date) {
return DateTimeUtils.toInternal((Date) object);
} else {
@@ -157,7 +157,7 @@ public class SparkRow implements InternalRow {
}
}
- private static Timestamp toFlinkTimestamp(Object object) {
+ private static Timestamp toPaimonTimestamp(Object object) {
if (object instanceof java.sql.Timestamp) {
return Timestamp.fromSQLTimestamp((java.sql.Timestamp) object);
} else {
@@ -165,7 +165,7 @@ public class SparkRow implements InternalRow {
}
}
- private static InternalMap toFlinkMap(MapType mapType, Map<Object, Object> map) {
+ private static InternalMap toPaimonMap(MapType mapType, Map<Object, Object> map) {
List<Object> keys = new ArrayList<>();
List<Object> values = new ArrayList<>();
map.forEach(
@@ -174,8 +174,8 @@ public class SparkRow implements InternalRow {
values.add(v);
});
- FlinkArray key = new FlinkArray(mapType.getKeyType(), keys);
- FlinkArray value = new FlinkArray(mapType.getValueType(), values);
+ PaimonArray key = new PaimonArray(mapType.getKeyType(), keys);
+ PaimonArray value = new PaimonArray(mapType.getValueType(), values);
return new InternalMap() {
@Override
public int size() {
@@ -194,12 +194,12 @@ public class SparkRow implements InternalRow {
};
}
- private static class FlinkArray implements InternalArray {
+ private static class PaimonArray implements InternalArray {
private final DataType elementType;
private final List<Object> list;
- private FlinkArray(DataType elementType, List<Object> list) {
+ private PaimonArray(DataType elementType, List<Object> list) {
this.list = list;
this.elementType = elementType;
}
@@ -237,7 +237,7 @@ public class SparkRow implements InternalRow {
@Override
public int getInt(int i) {
if (elementType instanceof DateType) {
- return toFlinkDate(getAs(i));
+ return toPaimonDate(getAs(i));
}
return getAs(i);
}
@@ -269,7 +269,7 @@ public class SparkRow implements InternalRow {
@Override
public Timestamp getTimestamp(int i, int precision) {
- return toFlinkTimestamp(getAs(i));
+ return toPaimonTimestamp(getAs(i));
}
@Override
@@ -279,12 +279,12 @@ public class SparkRow implements InternalRow {
@Override
public InternalArray getArray(int i) {
- return new FlinkArray(((ArrayType) elementType).getElementType(), getAs(i));
+ return new PaimonArray(((ArrayType) elementType).getElementType(), getAs(i));
}
@Override
public InternalMap getMap(int i) {
- return toFlinkMap((MapType) elementType, getAs(i));
+ return toPaimonMap((MapType) elementType, getAs(i));
}
@Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index 5a2c3c691..d05081fa0 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -55,7 +55,7 @@ public class SparkScan implements Scan, SupportsReportStatistics {
@Override
public StructType readSchema() {
- return SparkTypeUtils.fromFlinkRowType(readBuilder.readType());
+ return SparkTypeUtils.fromPaimonRowType(readBuilder.readType());
}
@Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index cd1c9cb40..c462eb56d 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -74,7 +74,7 @@ public class SparkTable
@Override
public StructType schema() {
- return SparkTypeUtils.fromFlinkRowType(table.rowType());
+ return SparkTypeUtils.fromPaimonRowType(table.rowType());
}
@Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
index ae4456f8a..75d39c22c 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
@@ -57,21 +57,21 @@ public class SparkTypeUtils {
private SparkTypeUtils() {}
- public static StructType fromFlinkRowType(RowType type) {
- return (StructType) fromFlinkType(type);
+ public static StructType fromPaimonRowType(RowType type) {
+ return (StructType) fromPaimonType(type);
}
- public static DataType fromFlinkType(org.apache.paimon.types.DataType type) {
- return type.accept(FlinkToSparkTypeVisitor.INSTANCE);
+ public static DataType fromPaimonType(org.apache.paimon.types.DataType type) {
+ return type.accept(PaimonToSparkTypeVisitor.INSTANCE);
}
- public static org.apache.paimon.types.DataType toFlinkType(DataType dataType) {
- return SparkToFlinkTypeVisitor.visit(dataType);
+ public static org.apache.paimon.types.DataType toPaimonType(DataType dataType) {
+ return SparkToPaimonTypeVisitor.visit(dataType);
}
- private static class FlinkToSparkTypeVisitor extends DataTypeDefaultVisitor<DataType> {
+ private static class PaimonToSparkTypeVisitor extends DataTypeDefaultVisitor<DataType> {
- private static final FlinkToSparkTypeVisitor INSTANCE = new FlinkToSparkTypeVisitor();
+ private static final PaimonToSparkTypeVisitor INSTANCE = new PaimonToSparkTypeVisitor();
@Override
public DataType visit(CharType charType) {
@@ -190,16 +190,16 @@ public class SparkTypeUtils {
}
}
- private static class SparkToFlinkTypeVisitor {
+ private static class SparkToPaimonTypeVisitor {
private final AtomicInteger currentIndex = new AtomicInteger(0);
static org.apache.paimon.types.DataType visit(DataType type) {
- return visit(type, new SparkToFlinkTypeVisitor());
+ return visit(type, new SparkToPaimonTypeVisitor());
}
static org.apache.paimon.types.DataType visit(
- DataType type, SparkToFlinkTypeVisitor visitor) {
+ DataType type, SparkToPaimonTypeVisitor visitor) {
if (type instanceof StructType) {
StructField[] fields = ((StructType) type).fields();
List<org.apache.paimon.types.DataType> fieldResults =
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/MinioTestContainer.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/MinioTestContainer.java
index b3c331f4b..6adafded3 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/MinioTestContainer.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/MinioTestContainer.java
@@ -43,7 +43,7 @@ import java.util.Map;
public class MinioTestContainer extends GenericContainer<MinioTestContainer>
implements BeforeAllCallback, AfterAllCallback {
- private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+ private static final String PAIMON_CONFIG_S3_ENDPOINT = "s3.endpoint";
private static final int DEFAULT_PORT = 9000;
@@ -110,7 +110,7 @@ public class MinioTestContainer extends GenericContainer<MinioTestContainer>
public Map<String, String> getS3ConfigOptions() {
Map<String, String> config = new HashMap<>();
- config.put(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+ config.put(PAIMON_CONFIG_S3_ENDPOINT, getHttpEndpoint());
config.put("s3.path.style.access", "true");
config.put("s3.access.key", accessKey);
config.put("s3.secret.key", secretKey);
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
index 866ab1e98..7f11b6d5d 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
@@ -81,7 +81,7 @@ public class SparkInternalRowTest {
Function1<Object, Object> sparkConverter =
CatalystTypeConverters.createToScalaConverter(
- SparkTypeUtils.fromFlinkType(ALL_TYPES));
+ SparkTypeUtils.fromPaimonType(ALL_TYPES));
org.apache.spark.sql.Row sparkRow =
(org.apache.spark.sql.Row)
sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(rowData));
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 0d539f900..36454f6f2 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -79,7 +79,7 @@ public abstract class SparkReadTestBase {
@BeforeEach
public void beforeEach() throws Exception {
- // flink sink
+ // Paimon sink
tablePath1 = new Path(warehousePath, "default.db/t1");
createTable("t1");
writeTable(
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index 1df0e93d2..d72c4415f 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -26,8 +26,8 @@ import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.paimon.spark.SparkTypeUtils.fromFlinkRowType;
-import static org.apache.paimon.spark.SparkTypeUtils.toFlinkType;
+import static org.apache.paimon.spark.SparkTypeUtils.fromPaimonRowType;
+import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link SparkTypeUtils}. */
@@ -96,9 +96,9 @@ public class SparkTypeTest {
+ "StructField(decimal2,DecimalType(38,2),true),"
+ "StructField(decimal3,DecimalType(10,1),true))";
- StructType sparkType = fromFlinkRowType(ALL_TYPES);
+ StructType sparkType = fromPaimonRowType(ALL_TYPES);
assertThat(sparkType.toString().replace(", ", ",")).isEqualTo(expected);
- assertThat(toFlinkType(sparkType)).isEqualTo(ALL_TYPES);
+ assertThat(toPaimonType(sparkType)).isEqualTo(ALL_TYPES);
}
}
diff --git a/paimon-spark/pom.xml b/paimon-spark/pom.xml
index a618cbfce..cbc4594fc 100644
--- a/paimon-spark/pom.xml
+++ b/paimon-spark/pom.xml
@@ -46,8 +46,6 @@ under the License.
</modules>
<dependencies>
- <!-- Flink All dependencies -->
-
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-shade</artifactId>