You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/20 06:27:24 UTC

[incubator-paimon] branch master updated: [code] Clean Flink related comment and naming (#642)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5945fefd9 [code] Clean Flink related comment and naming (#642)
5945fefd9 is described below

commit 5945fefd90c28af5ac1d9e1463ae76c3247c799b
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Mar 20 14:27:20 2023 +0800

    [code] Clean Flink related comment and naming (#642)
---
 .../org/apache/paimon/codegen/CodeGenLoader.java   |   6 +-
 .../apache/paimon/codegen/CodeGeneratorImpl.java   |   1 -
 .../apache/paimon/codegen/SortCodeGenerator.scala  |   5 +-
 .../org/apache/paimon/codegen/CompileUtils.java    |   2 +-
 .../org/apache/paimon/codegen/GeneratedClass.java  |   2 +-
 .../paimon/codegen/NormalizedKeyComputer.java      |   2 +-
 .../java/org/apache/paimon/codegen/Projection.java |   5 +-
 .../apache/paimon/codegen/RecordComparator.java    |   2 +-
 .../org/apache/paimon/fs/local/LocalFileIO.java    |   2 +-
 .../apache/paimon/io/DataInputDeserializer.java    |   2 -
 .../paimon/io/DataInputViewStreamWrapper.java      |   5 +-
 .../org/apache/paimon/options/OptionsUtils.java    |  51 ----------
 .../apache/paimon/plugin/ComponentClassLoader.java |   2 -
 .../org/apache/paimon/plugin/PluginLoader.java     |   2 -
 .../org/apache/paimon/utils/DateTimeUtils.java     |   6 +-
 .../java/org/apache/paimon/utils/HadoopUtils.java  |  15 +--
 .../main/java/org/apache/paimon/utils/Pool.java    |  13 +--
 .../org/apache/paimon/utils/Preconditions.java     |   2 +-
 .../java/org/apache/paimon/utils/ProjectedRow.java |   2 -
 .../java/org/apache/paimon/utils/Projection.java   |   2 -
 .../org/apache/paimon/data/BinaryStringTest.java   |   2 +-
 .../apache/paimon/datagen/RowDataGenerator.java    |   2 +-
 .../apache/paimon/fs/HadoopConfigLoadingTest.java  |   8 +-
 .../org/apache/paimon/utils/CommonTestUtils.java   | 112 +--------------------
 .../src/main/java/org/apache/paimon/WriteMode.java |   2 +-
 .../org/apache/paimon/casting/CastExecutor.java    |   1 -
 .../apache/paimon/disk/ChannelReaderInputView.java |   9 +-
 .../paimon/disk/ChannelWriterOutputView.java       |   7 +-
 .../apache/paimon/disk/FileChannelManagerImpl.java |   2 +-
 .../apache/paimon/sort/BinaryExternalMerger.java   |   2 +-
 .../paimon/sort/BinaryInMemorySortBuffer.java      |  16 +--
 .../apache/paimon/sort/BinaryIndexedSortable.java  |   1 -
 .../apache/paimon/utils/ExecutorThreadFactory.java |  15 +--
 .../paimon/stats/FieldStatsCollectorTest.java      |  14 +--
 .../org/apache/paimon/utils/FailingFileIO.java     |   2 +-
 .../main/java/org/apache/paimon/oss/OSSFileIO.java |   4 +-
 .../main/java/org/apache/paimon/s3/S3FileIO.java   |   4 +-
 .../paimon/format/avro/AbstractAvroBulkFormat.java |   6 +-
 .../apache/paimon/format/avro/AvroFileFormat.java  |   4 +-
 .../paimon/format/avro/AvroSchemaConverter.java    |  10 +-
 .../format/avro/AvroToRowDataConverters.java       |   4 +-
 .../paimon/format/avro/AvroWriterFactory.java      |   3 +-
 .../format/avro/RowDataToAvroConverters.java       |   6 +-
 .../paimon/format/fs/HadoopReadOnlyFileSystem.java |  17 ++--
 .../apache/paimon/format/orc/OrcFileFormat.java    |   2 +-
 .../apache/paimon/format/orc/OrcReaderFactory.java |  22 ++--
 .../orc/ThreadLocalClassLoaderConfiguration.java   |   4 +-
 .../format/orc/reader/AbstractOrcColumnVector.java |   4 +-
 .../format/orc/reader/OrcArrayColumnVector.java    |   8 +-
 .../format/orc/reader/OrcBytesColumnVector.java    |   2 +-
 .../format/orc/reader/OrcDecimalColumnVector.java  |   2 +-
 .../format/orc/reader/OrcDoubleColumnVector.java   |   2 +-
 .../format/orc/reader/OrcLongColumnVector.java     |   2 +-
 .../format/orc/reader/OrcMapColumnVector.java      |  12 +--
 .../format/orc/reader/OrcRowColumnVector.java      |   6 +-
 .../format/orc/reader/OrcSplitReaderUtil.java      |   1 -
 .../orc/reader/OrcTimestampColumnVector.java       |   2 +-
 .../format/orc/writer/PhysicalWriterImpl.java      |   4 +-
 .../format/parquet/ParquetReaderFactory.java       |   2 +-
 .../format/parquet/ParquetSchemaConverter.java     |   2 +-
 .../format/parquet/ParquetWriterFactory.java       |   5 +-
 .../parquet/writer/ParquetRowDataBuilder.java      |   2 +-
 .../writer/PositionOutputStreamAdapter.java        |   6 +-
 .../format/parquet/writer/StreamOutputFile.java    |   2 +-
 .../paimon/format/avro/AvroBulkFormatTest.java     |   4 +-
 .../java/org/apache/paimon/hive/HiveTypeUtils.java |   2 +-
 .../java/org/apache/paimon/hive/HiveSchema.java    |   4 +-
 .../paimon/hive/mapred/PaimonOutputFormat.java     |   2 +-
 .../objectinspector/PaimonDateObjectInspector.java |   2 +-
 .../apache/paimon/hive/HiveTableSchemaTest.java    |   6 +-
 .../paimon/hive/PaimonStorageHandlerITCase.java    |   2 +-
 .../apache/paimon/spark/SparkDataSourceReader.java |   2 +-
 .../apache/paimon/spark/SparkInternalRowTest.java  |   2 +-
 .../org/apache/paimon/spark/SparkReadITCase.java   |   2 +-
 .../org/apache/paimon/spark/SparkTypeTest.java     |   4 +-
 .../org/apache/paimon/spark/SparkArrayData.java    |  18 ++--
 .../java/org/apache/paimon/spark/SparkCatalog.java |   8 +-
 .../org/apache/paimon/spark/SparkInternalRow.java  |  46 ++++-----
 .../java/org/apache/paimon/spark/SparkRow.java     |  30 +++---
 .../java/org/apache/paimon/spark/SparkScan.java    |   2 +-
 .../java/org/apache/paimon/spark/SparkTable.java   |   2 +-
 .../org/apache/paimon/spark/SparkTypeUtils.java    |  22 ++--
 .../apache/paimon/spark/MinioTestContainer.java    |   4 +-
 .../apache/paimon/spark/SparkInternalRowTest.java  |   2 +-
 .../org/apache/paimon/spark/SparkReadTestBase.java |   2 +-
 .../org/apache/paimon/spark/SparkTypeTest.java     |   8 +-
 paimon-spark/pom.xml                               |   2 -
 87 files changed, 210 insertions(+), 442 deletions(-)

diff --git a/paimon-codegen-loader/src/main/java/org/apache/paimon/codegen/CodeGenLoader.java b/paimon-codegen-loader/src/main/java/org/apache/paimon/codegen/CodeGenLoader.java
index 50f8ecd4f..3dbf4db36 100644
--- a/paimon-codegen-loader/src/main/java/org/apache/paimon/codegen/CodeGenLoader.java
+++ b/paimon-codegen-loader/src/main/java/org/apache/paimon/codegen/CodeGenLoader.java
@@ -20,10 +20,10 @@ package org.apache.paimon.codegen;
 
 import org.apache.paimon.plugin.PluginLoader;
 
-/** Copied and modified from the flink-table-planner-loader module. */
+/** Loader to load codegen classes. */
 public class CodeGenLoader {
 
-    private static final String FLINK_TABLE_STORE_CODEGEN_FAT_JAR = "paimon-codegen.jar";
+    private static final String CODEGEN_FAT_JAR = "paimon-codegen.jar";
 
     // Singleton lazy initialization
 
@@ -32,7 +32,7 @@ public class CodeGenLoader {
     private static synchronized PluginLoader getLoader() {
         if (loader == null) {
             // Avoid NoClassDefFoundError without cause by exception
-            loader = new PluginLoader(FLINK_TABLE_STORE_CODEGEN_FAT_JAR);
+            loader = new PluginLoader(CODEGEN_FAT_JAR);
         }
         return loader;
     }
diff --git a/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java b/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java
index c6e544d0d..eb3dd3d5e 100644
--- a/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java
+++ b/paimon-codegen/src/main/java/org/apache/paimon/codegen/CodeGeneratorImpl.java
@@ -56,7 +56,6 @@ public class CodeGeneratorImpl implements CodeGenerator {
     private SortSpec getAscendingSortSpec(int numFields) {
         SortSpec.SortSpecBuilder builder = SortSpec.builder();
         for (int i = 0; i < numFields; i++) {
-            // Flink's default null collation is NullCollation.LOW, see FlinkPlannerImpl
             builder.addField(i, true, false);
         }
         return builder.build();
diff --git a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/SortCodeGenerator.scala b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/SortCodeGenerator.scala
index 30d3639ea..aa2efd524 100644
--- a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/SortCodeGenerator.scala
+++ b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/SortCodeGenerator.scala
@@ -182,11 +182,10 @@ class SortCodeGenerator(val input: RowType, val sortSpec: SortSpec) {
   def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String] = {
     /* Example generated code, for int:
     if (record.isNullAt(0)) {
-      org.apache.flink.table.data.binary.BinaryRowDataUtil.minNormalizedKey(target, offset+0, 5);
+      SortUtil.minNormalizedKey(target, offset+0, 5);
     } else {
       target.put(offset+0, (byte) 1);
-      org.apache.flink.table.data.binary.BinaryRowDataUtil.putIntNormalizedKey(
-        record.getInt(0), target, offset+1, 4);
+      SortUtil.putIntNormalizedKey(record.getInt(0), target, offset+1, 4);
     }
      */
     val putKeys = new mutable.ArrayBuffer[String]
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java b/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
index fbe4235b2..ec45221cd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
@@ -29,7 +29,7 @@ import java.util.Objects;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** Utilities to compile a generated code to a Class. Copied from Flink. */
+/** Utilities to compile a generated code to a Class. */
 public final class CompileUtils {
 
     // used for logging the generated codes to a same place
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/GeneratedClass.java b/paimon-common/src/main/java/org/apache/paimon/codegen/GeneratedClass.java
index d6791254b..97d639230 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/GeneratedClass.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/GeneratedClass.java
@@ -29,7 +29,7 @@ import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
  * A wrapper for generated class, defines a {@link #newInstance(ClassLoader)} method to get an
- * instance by reference objects easily. Copied from Flink.
+ * instance by reference objects easily.
  */
 public final class GeneratedClass<T> implements Serializable {
 
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/NormalizedKeyComputer.java b/paimon-common/src/main/java/org/apache/paimon/codegen/NormalizedKeyComputer.java
index f0d8b46b4..0a59b5b67 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/NormalizedKeyComputer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/NormalizedKeyComputer.java
@@ -23,7 +23,7 @@ import org.apache.paimon.memory.MemorySegment;
 
 /**
  * Normalized key computer for {@code SortBuffer}. For performance, subclasses are usually
- * implemented through CodeGenerator. Copied from Flink.
+ * implemented through CodeGenerator.
  */
 public interface NormalizedKeyComputer {
 
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/Projection.java b/paimon-common/src/main/java/org/apache/paimon/codegen/Projection.java
index 81922588c..e872c512e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/Projection.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/Projection.java
@@ -21,10 +21,7 @@ package org.apache.paimon.codegen;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 
-/**
- * Interface for code generated projection, which will map a RowData to another BinaryRowData.
- * Copied from Flink.
- */
+/** Interface for code generated projection, which will map a RowData to another BinaryRowData. */
 public interface Projection {
 
     BinaryRow apply(InternalRow row);
diff --git a/paimon-common/src/main/java/org/apache/paimon/codegen/RecordComparator.java b/paimon-common/src/main/java/org/apache/paimon/codegen/RecordComparator.java
index 8e3014183..8d58ca58d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/RecordComparator.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/RecordComparator.java
@@ -25,7 +25,7 @@ import java.util.Comparator;
 
 /**
  * Record comparator for {@code BinaryInMemorySortBuffer}. For performance, subclasses are usually
- * implemented through CodeGenerator. A new interface for helping JVM inline. Copied from Flink.
+ * implemented through CodeGenerator. A new interface for helping JVM inline.
  */
 public interface RecordComparator extends Comparator<InternalRow>, Serializable {
 
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index 3f19bf316..bf6e3eab2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -93,7 +93,7 @@ public class LocalFileIO implements FileIO {
                     "File "
                             + file
                             + " does not exist or the user running "
-                            + "Flink ('"
+                            + "Paimon ('"
                             + System.getProperty("user.name")
                             + "') has insufficient permissions to access it.");
         }
diff --git a/paimon-common/src/main/java/org/apache/paimon/io/DataInputDeserializer.java b/paimon-common/src/main/java/org/apache/paimon/io/DataInputDeserializer.java
index 77a34a918..5707466ed 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/DataInputDeserializer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/DataInputDeserializer.java
@@ -71,8 +71,6 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl
             this.position = buffer.arrayOffset() + buffer.position();
             this.end = this.position + buffer.remaining();
         } else if (buffer.isDirect() || buffer.isReadOnly()) {
-            // TODO: FLINK-8585 handle readonly and other non array based buffers more efficiently
-            // without data copy
             this.buffer = new byte[buffer.remaining()];
             this.position = 0;
             this.end = this.buffer.length;
diff --git a/paimon-common/src/main/java/org/apache/paimon/io/DataInputViewStreamWrapper.java b/paimon-common/src/main/java/org/apache/paimon/io/DataInputViewStreamWrapper.java
index b14eddbd3..14f20ad8d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/DataInputViewStreamWrapper.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/DataInputViewStreamWrapper.java
@@ -23,10 +23,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
-/**
- * Utility class that turns an {@link InputStream} into a {@link
- * org.apache.flink.core.memory.DataInputView}.
- */
+/** Utility class that turns an {@link InputStream} into a {@link DataInputView}. */
 public class DataInputViewStreamWrapper extends DataInputStream implements DataInputView {
 
     public DataInputViewStreamWrapper(InputStream in) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
index 54dc795a4..a0943208f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
@@ -20,12 +20,8 @@ package org.apache.paimon.options;
 
 import org.apache.paimon.utils.TimeUtils;
 
-import javax.annotation.Nonnull;
-
-import java.io.File;
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -36,53 +32,6 @@ import static org.apache.paimon.options.StructuredOptionsSplitter.escapeWithSing
 /** Utility class for {@link Options} related helper functions. */
 public class OptionsUtils {
 
-    private static final String[] EMPTY = new String[0];
-
-    /**
-     * Parses a string as a map of strings. The expected format of the map is:
-     *
-     * <pre>
-     * key1:value1,key2:value2
-     * </pre>
-     *
-     * <p>Parts of the string can be escaped by wrapping with single or double quotes.
-     *
-     * @param stringSerializedMap a string to parse
-     * @return parsed map
-     */
-    public static Map<String, String> parseMap(String stringSerializedMap) {
-        return StructuredOptionsSplitter.splitEscaped(stringSerializedMap, ',').stream()
-                .map(p -> StructuredOptionsSplitter.splitEscaped(p, ':'))
-                .collect(
-                        Collectors.toMap(
-                                arr -> arr.get(0), // key name
-                                arr -> arr.get(1) // value
-                                ));
-    }
-
-    @Nonnull
-    public static String[] splitPaths(@Nonnull String separatedPaths) {
-        return separatedPaths.length() > 0
-                ? separatedPaths.split(",|" + File.pathSeparator)
-                : EMPTY;
-    }
-
-    /**
-     * Extract and parse Flink configuration properties with a given name prefix and return the
-     * result as a Map.
-     */
-    public static Map<String, String> getPrefixedKeyValuePairs(
-            String prefix, Options configuration) {
-        Map<String, String> result = new HashMap<>();
-        for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
-            if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
-                String key = entry.getKey().substring(prefix.length());
-                result.put(key, entry.getValue());
-            }
-        }
-        return result;
-    }
-
     // --------------------------------------------------------------------------------------------
     //  Type conversion
     // --------------------------------------------------------------------------------------------
diff --git a/paimon-common/src/main/java/org/apache/paimon/plugin/ComponentClassLoader.java b/paimon-common/src/main/java/org/apache/paimon/plugin/ComponentClassLoader.java
index 226fb9bc1..4e24ae2a3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/plugin/ComponentClassLoader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/plugin/ComponentClassLoader.java
@@ -51,8 +51,6 @@ import java.util.Iterator;
  *   <li>component-first: component -> bootstrap -> owner; opt-in.
  *   <li>owner-first: owner -> component -> bootstrap; opt-in.
  * </ul>
- *
- * <p>NOTE: Copied from Flink.
  */
 public class ComponentClassLoader extends URLClassLoader {
     private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER;
diff --git a/paimon-common/src/main/java/org/apache/paimon/plugin/PluginLoader.java b/paimon-common/src/main/java/org/apache/paimon/plugin/PluginLoader.java
index 43199c452..ea1bfc62b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/plugin/PluginLoader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/plugin/PluginLoader.java
@@ -51,8 +51,6 @@ public class PluginLoader {
             Stream.concat(
                             Arrays.stream(PARENT_FIRST_LOGGING_PATTERNS),
                             Stream.of(
-                                    // These packages are shipped either by
-                                    // flink-table-runtime or flink-dist itself
                                     "org.codehaus.janino",
                                     "org.codehaus.commons",
                                     "org.apache.commons.lang3"))
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
index 66d509970..6ed26cafd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
@@ -37,11 +37,7 @@ import static java.time.temporal.ChronoField.NANO_OF_SECOND;
 import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
 import static java.time.temporal.ChronoField.YEAR;
 
-/**
- * Utils for date time.
- *
- * <p>NOTE: Copied from Flink.
- */
+/** Utils for date time. */
 public class DateTimeUtils {
 
     /** The julian date of the epoch, 1970-01-01. */
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
index 2dc13cc8b..5197fac33 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/HadoopUtils.java
@@ -89,12 +89,12 @@ public class HadoopUtils {
             }
         }
 
-        // Approach 2: Flink configuration (deprecated)
+        // Approach 2: Paimon configuration (deprecated)
         final String hdfsDefaultPath = options.getString(HDFS_DEFAULT_CONFIG, null);
         if (hdfsDefaultPath != null) {
             result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
             LOG.debug(
-                    "Using hdfs-default configuration-file path from Flink config: {}",
+                    "Using hdfs-default configuration-file path from Paimon config: {}",
                     hdfsDefaultPath);
             foundHadoopConfiguration = true;
         }
@@ -103,13 +103,14 @@ public class HadoopUtils {
         if (hdfsSitePath != null) {
             result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
             LOG.debug(
-                    "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
+                    "Using hdfs-site configuration-file path from Paimon config: {}", hdfsSitePath);
             foundHadoopConfiguration = true;
         }
 
         final String hadoopConfigPath = options.getString(PATH_HADOOP_CONFIG, null);
         if (hadoopConfigPath != null) {
-            LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
+            LOG.debug(
+                    "Searching Hadoop configuration files in Paimon config: {}", hadoopConfigPath);
             foundHadoopConfiguration =
                     addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
         }
@@ -122,8 +123,8 @@ public class HadoopUtils {
                     addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
         }
 
-        // Approach 4: Flink configuration
-        // add all configuration key with prefix 'hadoop.' in flink conf to hadoop conf
+        // Approach 4: Paimon configuration
+        // add all configuration key with prefix 'hadoop.' in Paimon conf to hadoop conf
         for (String key : options.keySet()) {
             for (String prefix : CONFIG_PREFIXES) {
                 if (key.startsWith(prefix)) {
@@ -131,7 +132,7 @@ public class HadoopUtils {
                     String value = options.getString(key, null);
                     result.set(newKey, value);
                     LOG.debug(
-                            "Adding Flink config entry for {} as {}={} to Hadoop config",
+                            "Adding Paimon config entry for {} as {}={} to Hadoop config",
                             key,
                             newKey,
                             value);
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java b/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
index 5a9b2acbe..888bf8874 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
@@ -18,22 +18,11 @@
 
 package org.apache.paimon.utils;
 
-import org.apache.paimon.reader.RecordReader;
-
 import javax.annotation.Nullable;
 
 import java.util.concurrent.ArrayBlockingQueue;
 
-/**
- * A pool to cache and recycle heavyweight objects, to reduce object allocation.
- *
- * <p>This pool can be used in the {@link RecordReader}, when the returned objects are heavyweight
- * and need to be reused for efficiency. Because the reading happens in I/O threads while the record
- * processing happens in Flink's main processing threads, these objects cannot be reused immediately
- * after being returned. They can be reused, once they are recycled back to the pool.
- *
- * @param <T> The type of object cached in the pool.
- */
+/** A pool to cache and recycle heavyweight objects, to reduce object allocation. */
 public class Pool<T> {
 
     private final ArrayBlockingQueue<T> pool;
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Preconditions.java b/paimon-common/src/main/java/org/apache/paimon/utils/Preconditions.java
index 46441a23b..b90ea3b95 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Preconditions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Preconditions.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ExecutionException;
  * A collection of static utility methods to validate input.
  *
  * <p>This class is modelled after Google Guava's Preconditions class, and partly takes code from
- * that class. We add this code to the Flink code base in order to reduce external dependencies.
+ * that class. We add this code to the Paimon code base in order to reduce external dependencies.
  */
 public final class Preconditions {
 
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
index 96c91520a..7c97e7652 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
@@ -36,8 +36,6 @@ import java.util.Arrays;
  * <p>Projection includes both reducing the accessible fields and reordering them.
  *
  * <p>Note: This class supports only top-level projections, not nested projections.
- *
- * <p>NOTE: Copied from Flink.
  */
 public class ProjectedRow implements InternalRow {
 
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Projection.java b/paimon-common/src/main/java/org/apache/paimon/utils/Projection.java
index d673a7724..bfdca8e71 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Projection.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Projection.java
@@ -38,8 +38,6 @@ import static org.apache.paimon.types.DataTypeRoot.ROW;
 /**
  * {@link Projection} represents a list of (possibly nested) indexes that can be used to project
  * data types. A row projection includes both reducing the accessible fields and reordering them.
- *
- * <p>NOTE: Copied from Flink.
  */
 public abstract class Projection {
 
diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java
index b61d5bd24..c5c567389 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java
@@ -131,7 +131,7 @@ public class BinaryStringTest {
         checkBasic(",", 1);
         checkBasic("hello", 5);
         checkBasic("hello world", 11);
-        checkBasic("Flink中文社区", 9);
+        checkBasic("Paimon中文社区", 10);
         checkBasic("中 文 社 区", 7);
 
         checkBasic("¡", 1); // 2 bytes char
diff --git a/paimon-common/src/test/java/org/apache/paimon/datagen/RowDataGenerator.java b/paimon-common/src/test/java/org/apache/paimon/datagen/RowDataGenerator.java
index 543b5257c..08497e1fb 100644
--- a/paimon-common/src/test/java/org/apache/paimon/datagen/RowDataGenerator.java
+++ b/paimon-common/src/test/java/org/apache/paimon/datagen/RowDataGenerator.java
@@ -21,7 +21,7 @@ package org.apache.paimon.datagen;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 
-/** Data generator for Flink's internal {@link InternalRow} type. */
+/** Data generator for Paimon's internal {@link InternalRow} type. */
 public class RowDataGenerator implements DataGenerator<InternalRow> {
 
     private final DataGenerator<?>[] fieldGenerators;
diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
index 0a34adbea..50816dc5a 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopConfigLoadingTest.java
@@ -39,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Tests that validate the loading of the Hadoop configuration, relative to entries in the Flink
+ * Tests that validate the loading of the Hadoop configuration, relative to entries in the Paimon
  * configuration and the environment variables.
  */
 @SuppressWarnings("deprecation")
@@ -188,8 +188,8 @@ public class HadoopConfigLoadingTest {
         final String k5 = "key5";
 
         final String v1 = "from HADOOP_CONF_DIR";
-        final String v2 = "from Flink config `fs.hdfs.hadoopconf`";
-        final String v3 = "from Flink config `fs.hdfs.hdfsdefault`";
+        final String v2 = "from Paimon config `fs.hdfs.hadoopconf`";
+        final String v3 = "from Paimon config `fs.hdfs.hdfsdefault`";
         final String v4 = "from HADOOP_HOME/etc/hadoop";
         final String v5 = "from HADOOP_HOME/conf";
 
@@ -270,7 +270,7 @@ public class HadoopConfigLoadingTest {
     }
 
     @Test
-    public void loadFromFlinkConfEntry() throws Exception {
+    public void loadFromPaimonConfEntry() {
         final String prefix = "hadoop.";
 
         final String k1 = "brooklyn";
diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java b/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
index 76ba50f81..0121a918b 100644
--- a/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
@@ -18,14 +18,6 @@
 
 package org.apache.paimon.utils;
 
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
 import java.time.Duration;
 import java.util.Map;
@@ -41,72 +33,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 /** This class contains reusable utility methods for unit tests. */
 public class CommonTestUtils {
 
-    /**
-     * Creates a copy of an object via Java Serialization.
-     *
-     * @param original The original object.
-     * @return The copied object.
-     */
-    public static <T extends java.io.Serializable> T createCopySerializable(T original)
-            throws IOException {
-        if (original == null) {
-            throw new IllegalArgumentException();
-        }
-
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        oos.writeObject(original);
-        oos.close();
-        baos.close();
-
-        ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-
-        try (ObjectInputStream ois = new ObjectInputStream(bais)) {
-            @SuppressWarnings("unchecked")
-            T copy = (T) ois.readObject();
-            return copy;
-        } catch (ClassNotFoundException e) {
-            throw new IOException(e);
-        }
-    }
-
-    /**
-     * Creates a temporary file that contains the given string. The file is written with the
-     * platform's default encoding.
-     *
-     * <p>The temp file is automatically deleted on JVM exit.
-     *
-     * @param contents The contents to be written to the file.
-     * @return The temp file URI.
-     */
-    public static String createTempFile(String contents) throws IOException {
-        File f = File.createTempFile("flink_test_", ".tmp");
-        f.deleteOnExit();
-
-        try (BufferedWriter out = new BufferedWriter(new FileWriter(f))) {
-            out.write(contents);
-        }
-        return f.toURI().toString();
-    }
-
-    /**
-     * Permanently blocks the current thread. The thread cannot be woken up via {@link
-     * Thread#interrupt()}.
-     */
-    public static void blockForeverNonInterruptibly() {
-        final Object lock = new Object();
-        //noinspection InfiniteLoopStatement
-        while (true) {
-            try {
-                //noinspection SynchronizationOnLocalVariableOrMethodParameter
-                synchronized (lock) {
-                    lock.wait();
-                }
-            } catch (InterruptedException ignored) {
-            }
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  Manipulation of environment
     // ------------------------------------------------------------------------
@@ -133,6 +59,7 @@ public class CommonTestUtils {
             // only for Windows
             Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
             try {
+                @SuppressWarnings("JavaReflectionMemberAccess")
                 Field theCaseInsensitiveEnvironmentField =
                         processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
                 theCaseInsensitiveEnvironmentField.setAccessible(true);
@@ -150,28 +77,6 @@ public class CommonTestUtils {
         }
     }
 
-    /**
-     * Checks whether the given throwable contains the given cause as a cause. The cause is not
-     * checked on equality but on type equality.
-     *
-     * @param throwable Throwable to check for the cause
-     * @param cause Cause to look for
-     * @return True if the given Throwable contains the given cause (type equality); otherwise false
-     */
-    public static boolean containsCause(Throwable throwable, Class<? extends Throwable> cause) {
-        Throwable current = throwable;
-
-        while (current != null) {
-            if (cause.isAssignableFrom(current.getClass())) {
-                return true;
-            }
-
-            current = current.getCause();
-        }
-
-        return false;
-    }
-
     /** Checks whether an exception with a message occurs when running a piece of code. */
     public static void assertThrows(
             String msg, Class<? extends Exception> expected, Callable<?> code) {
@@ -213,19 +118,4 @@ public class CommonTestUtils {
             throw new TimeoutException(errorMsg);
         }
     }
-
-    /**
-     * Wait util the given condition is met or timeout.
-     *
-     * @param condition the condition to wait for.
-     * @param timeout the maximum time to wait for the condition to become true.
-     * @param errorMsg the error message to include in the <code>TimeoutException</code> if the
-     *     condition was not met before timeout.
-     * @throws TimeoutException if the condition is not met before timeout.
-     * @throws InterruptedException if the thread is interrupted.
-     */
-    public static void waitUtil(Supplier<Boolean> condition, Duration timeout, String errorMsg)
-            throws TimeoutException, InterruptedException {
-        waitUtil(condition, timeout, Duration.ofMillis(1), errorMsg);
-    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java b/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
index 13d221e7c..e815435ee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
@@ -23,7 +23,7 @@ import org.apache.paimon.options.description.InlineElement;
 
 import static org.apache.paimon.options.description.TextElement.text;
 
-/** Defines the write mode for flink paimon. */
+/** Defines the write mode for paimon. */
 public enum WriteMode implements DescribedEnum {
     APPEND_ONLY(
             "append-only",
diff --git a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
index 32e35bac5..10b831da3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
@@ -20,7 +20,6 @@ package org.apache.paimon.casting;
 
 /**
  * Interface to model a function that performs the casting of a value from one type to another.
- * Copied from flink.
  *
  * @param <IN> Input internal type
  * @param <OUT> Output internal type
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
index 5d1abea41..e0ecaa68c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java
@@ -21,6 +21,7 @@ package org.apache.paimon.disk;
 import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.BlockDecompressor;
 import org.apache.paimon.data.AbstractPagedInputView;
+import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.memory.Buffer;
 import org.apache.paimon.memory.MemorySegment;
 
@@ -30,10 +31,10 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a {@link
- * BufferFileReader}, making it effectively a data input stream. The view reads it data in blocks
- * from the underlying channel and decompress it before returning to caller. The view can only read
- * data that has been written by {@link ChannelWriterOutputView}, due to block formatting.
+ * A {@link DataInputView} that is backed by a {@link BufferFileReader}, making it effectively a
+ * data input stream. The view reads it data in blocks from the underlying channel and decompress it
+ * before returning to caller. The view can only read data that has been written by {@link
+ * ChannelWriterOutputView}, due to block formatting.
  */
 public class ChannelReaderInputView extends AbstractPagedInputView {
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
index f157e03ab..41089e197 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
@@ -21,15 +21,16 @@ package org.apache.paimon.disk;
 import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.compression.BlockCompressor;
 import org.apache.paimon.data.AbstractPagedOutputView;
+import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.memory.Buffer;
 import org.apache.paimon.memory.MemorySegment;
 
 import java.io.IOException;
 
 /**
- * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a {@link FileIOChannel},
- * making it effectively a data output stream. The view will compress its data before writing it in
- * blocks to the underlying channel.
+ * A {@link DataOutputView} that is backed by a {@link FileIOChannel}, making it effectively a data
+ * output stream. The view will compress its data before writing it in blocks to the underlying
+ * channel.
  */
 public final class ChannelWriterOutputView extends AbstractPagedOutputView {
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
index 8359a14af..d78a361c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
@@ -66,7 +66,7 @@ public class FileChannelManagerImpl implements FileChannelManager {
         File[] files = new File[tempDirs.length];
         for (int i = 0; i < tempDirs.length; i++) {
             File baseDir = new File(tempDirs[i]);
-            String subfolder = String.format("flink-%s-%s", prefix, UUID.randomUUID());
+            String subfolder = String.format("paimon-%s-%s", prefix, UUID.randomUUID());
             File storageDir = new File(baseDir, subfolder);
 
             if (!storageDir.exists() && !storageDir.mkdirs()) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalMerger.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalMerger.java
index 75dab68c2..08f63d7de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalMerger.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalMerger.java
@@ -33,7 +33,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
-/** Record merger for sort of BinaryRow. Copied from Flink. */
+/** Record merger for sort of BinaryRow. */
 public class BinaryExternalMerger extends AbstractBinaryExternalMerger<BinaryRow> {
 
     private final BinaryRowSerializer serializer;
diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
index 88c632f90..ec52a32e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
@@ -36,16 +36,11 @@ import java.util.ArrayList;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
- * In memory sort buffer for binary row. The main code is copied from Flink {@code
- * BinaryInMemorySortBuffer} instead of extended because it's a final class.
- *
- * <p>The main differences in the new sort buffer are:
+ * In memory sort buffer for binary row.
  *
  * <ul>
- *   <li>1. Add clear method to clean all memory.
- *   <li>2. Add tryInitialized() method to initialize memory before write and read in buffer, while
- *       the old buffer will do it in the constructor and reset().
- *   <li>3. Remove reset() and etc. methods which are not used in flink paimon.
+ *   <li>{@link #clear}: Clean all memory.
+ *   <li>{@link #tryInitialize}: initialize memory before write and read in buffer.
  * </ul>
  */
 public class BinaryInMemorySortBuffer extends BinaryIndexedSortable implements SortBuffer {
@@ -127,11 +122,6 @@ public class BinaryInMemorySortBuffer extends BinaryIndexedSortable implements S
         }
     }
 
-    /**
-     * We add clear() method here instead of reset() to release all memory segments. The reset()
-     * method in flink sort buffer will clear memory and grab two buffers for
-     * currentSortIndexSegment and recordCollector.
-     */
     @Override
     public void clear() {
         if (this.isInitialized) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryIndexedSortable.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryIndexedSortable.java
index 202dc0dce..68e907d44 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryIndexedSortable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryIndexedSortable.java
@@ -33,7 +33,6 @@ import java.util.ArrayList;
 
 /**
  * An abstract sortable, provide basic compare and swap. Support writing of index and normalizedKey.
- * Copied from Flink.
  */
 public abstract class BinaryIndexedSortable implements IndexedSortable {
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
index d4b2261ea..4a4dd636a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
@@ -26,9 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
- * A thread factory intended for use by critical thread pools. Critical thread pools here mean
- * thread pools that support Flink's core coordination and processing work, and which must not
- * simply cause unnoticed errors.
+ * A thread factory intended for use by critical thread pools.
  *
  * <p>The thread factory can be given an {@link Thread.UncaughtExceptionHandler} for the threads. If
  * no handler is explicitly given, the default handler for uncaught exceptions will log the
@@ -42,9 +40,6 @@ import static org.apache.paimon.utils.Preconditions.checkNotNull;
  */
 public class ExecutorThreadFactory implements ThreadFactory {
 
-    /** The thread pool name used when no explicit pool name has been specified. */
-    private static final String DEFAULT_POOL_NAME = "flink-executor-pool";
-
     private final AtomicInteger threadNumber = new AtomicInteger(1);
 
     private final ThreadGroup group;
@@ -57,14 +52,6 @@ public class ExecutorThreadFactory implements ThreadFactory {
 
     // ------------------------------------------------------------------------
 
-    /**
-     * Creates a new thread factory using the default thread pool name ('flink-executor-pool') and
-     * the default uncaught exception handler (log exception and kill process).
-     */
-    public ExecutorThreadFactory() {
-        this(DEFAULT_POOL_NAME);
-    }
-
     /**
      * Creates a new thread factory using the given thread pool name and the default uncaught
      * exception handler (log exception and kill process).
diff --git a/paimon-core/src/test/java/org/apache/paimon/stats/FieldStatsCollectorTest.java b/paimon-core/src/test/java/org/apache/paimon/stats/FieldStatsCollectorTest.java
index 6d3d9f2b2..111d738c2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/stats/FieldStatsCollectorTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/stats/FieldStatsCollectorTest.java
@@ -43,14 +43,14 @@ public class FieldStatsCollectorTest {
 
         collector.collect(
                 GenericRow.of(
-                        1, BinaryString.fromString("Flink"), new GenericArray(new int[] {1, 10})));
+                        1, BinaryString.fromString("Paimon"), new GenericArray(new int[] {1, 10})));
         assertThat(collector.extract())
                 .isEqualTo(
                         new FieldStats[] {
                             new FieldStats(1, 1, 0L),
                             new FieldStats(
-                                    BinaryString.fromString("Flink"),
-                                    BinaryString.fromString("Flink"),
+                                    BinaryString.fromString("Paimon"),
+                                    BinaryString.fromString("Paimon"),
                                     0L),
                             new FieldStats(null, null, 0L)
                         });
@@ -61,8 +61,8 @@ public class FieldStatsCollectorTest {
                         new FieldStats[] {
                             new FieldStats(1, 3, 0L),
                             new FieldStats(
-                                    BinaryString.fromString("Flink"),
-                                    BinaryString.fromString("Flink"),
+                                    BinaryString.fromString("Paimon"),
+                                    BinaryString.fromString("Paimon"),
                                     1L),
                             new FieldStats(null, null, 0L)
                         });
@@ -78,7 +78,7 @@ public class FieldStatsCollectorTest {
                             new FieldStats(1, 3, 1L),
                             new FieldStats(
                                     BinaryString.fromString("Apache"),
-                                    BinaryString.fromString("Flink"),
+                                    BinaryString.fromString("Paimon"),
                                     1L),
                             new FieldStats(null, null, 0L)
                         });
@@ -90,7 +90,7 @@ public class FieldStatsCollectorTest {
                             new FieldStats(1, 3, 1L),
                             new FieldStats(
                                     BinaryString.fromString("Apache"),
-                                    BinaryString.fromString("Flink"),
+                                    BinaryString.fromString("Paimon"),
                                     1L),
                             new FieldStats(null, null, 1L)
                         });
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java b/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
index 022b03eef..0c45cedb7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
@@ -84,7 +84,7 @@ public class FailingFileIO extends TraceableFileIO {
             throw new FileNotFoundException(
                     "File "
                             + path
-                            + " does not exist or the user running Flink ('"
+                            + " does not exist or the user running Paimon ('"
                             + System.getProperty("user.name")
                             + "') has insufficient permissions to access it.");
         }
diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
index 82e3cfdc1..6ecc76da3 100644
--- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
+++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
@@ -44,8 +44,8 @@ public class OSSFileIO extends HadoopCompliantFileIO {
     private static final Logger LOG = LoggerFactory.getLogger(OSSFileIO.class);
 
     /**
-     * In order to simplify, we make flink oss configuration keys same with hadoop oss module. So,
-     * we add all configuration key with prefix `fs.oss` in flink conf to hadoop conf.
+     * In order to simplify, we make paimon oss configuration keys same with hadoop oss module. So,
+     * we add all configuration key with prefix `fs.oss` in paimon conf to hadoop conf.
      */
     private static final String[] CONFIG_PREFIXES = {"fs.oss."};
 
diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
index 8caf3b574..a58aa18ea 100644
--- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
+++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
@@ -68,11 +68,11 @@ public class S3FileIO extends HadoopCompliantFileIO {
 
     @Override
     public void configure(CatalogContext context) {
-        this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromFlink(context));
+        this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context));
     }
 
     // add additional config entries from the IO config to the Hadoop config
-    private Options loadHadoopConfigFromFlink(CatalogContext context) {
+    private Options loadHadoopConfigFromContext(CatalogContext context) {
         Options hadoopConfig = new Options();
         for (String key : context.options().keySet()) {
             for (String prefix : CONFIG_PREFIXES) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
index 622978b08..2ce1f7c18 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
@@ -39,11 +39,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.function.Function;
 
-/**
- * Provides a {@link FormatReaderFactory} for Avro records.
- *
- * <p>NOTE: Copied from Flink.
- */
+/** Provides a {@link FormatReaderFactory} for Avro records. */
 public abstract class AbstractAvroBulkFormat<A> implements FormatReaderFactory {
 
     private static final long serialVersionUID = 1L;
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index 05571e314..ee4cff70f 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -49,7 +49,7 @@ import java.util.function.Function;
 
 import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
 
-/** Avro {@link FileFormat}. The main code is copied from Flink {@code AvroFileFormatFactory}. */
+/** Avro {@link FileFormat}. */
 public class AvroFileFormat extends FileFormat {
 
     public static final String IDENTIFIER = "avro";
@@ -74,8 +74,6 @@ public class AvroFileFormat extends FileFormat {
         // if the schema given to the reader is not equal to the schema in header,
         // reader will automatically map the fields and give back records with our desired
         // schema
-        //
-        // for detailed discussion see comments in https://github.com/apache/flink/pull/18657
         DataType producedType = Projection.of(projection).project(type);
         return new AvroGenericRecordBulkFormat((RowType) producedType.copy(false));
     }
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
index 99dcfe4bc..86fc18110 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
@@ -35,7 +35,7 @@ import org.apache.avro.SchemaBuilder;
 
 import java.util.List;
 
-/** Converts an Avro schema into Flink's type information. */
+/** Converts an Avro schema into Paimon's type information. */
 public class AvroSchemaConverter {
 
     private AvroSchemaConverter() {
@@ -43,20 +43,20 @@ public class AvroSchemaConverter {
     }
 
     /**
-     * Converts Flink SQL {@link DataType} (can be nested) into an Avro schema.
+     * Converts Paimon {@link DataType} (can be nested) into an Avro schema.
      *
-     * <p>Use "org.apache.flink.avro.generated.record" as the type name.
+     * <p>Use "org.apache.paimon.avro.generated.record" as the type name.
      *
      * @param schema the schema type, usually it should be the top level record type, e.g. not a
      *     nested type
      * @return Avro's {@link Schema} matching this logical type.
      */
     public static Schema convertToSchema(DataType schema) {
-        return convertToSchema(schema, "org.apache.flink.avro.generated.record");
+        return convertToSchema(schema, "org.apache.paimon.avro.generated.record");
     }
 
     /**
-     * Converts Flink SQL {@link DataType} (can be nested) into an Avro schema.
+     * Converts Paimon {@link DataType} (can be nested) into an Avro schema.
      *
      * <p>The "{rowName}_" is used as the nested row type name prefix in order to generate the right
      * schema. Nested record type that only differs with type name is still compatible.
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroToRowDataConverters.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroToRowDataConverters.java
index 3ccb9166a..cf15f4f41 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroToRowDataConverters.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroToRowDataConverters.java
@@ -53,8 +53,8 @@ import static org.apache.paimon.format.avro.AvroSchemaConverter.extractValueType
 public class AvroToRowDataConverters {
 
     /**
-     * Runtime converter that converts Avro data structures into objects of Flink Table & SQL
-     * internal data structures.
+     * Runtime converter that converts Avro data structures into objects of Paimon internal data
+     * structures.
      */
     @FunctionalInterface
     public interface AvroToRowDataConverter extends Serializable {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
index 7aca7fde6..5dd08e413 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
@@ -25,8 +25,7 @@ import org.apache.avro.file.DataFileWriter;
 import java.io.IOException;
 
 /**
- * A factory that creates an {@link AvroBulkWriter}. The factory takes a user-supplied builder to
- * assemble Parquet's writer and then turns it into a Flink {@code BulkWriter}.
+ * A factory that creates an {@link AvroBulkWriter}.
  *
  * @param <T> The type of record to write.
  */
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
index d23d97a19..a89e8d070 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
@@ -50,8 +50,8 @@ public class RowDataToAvroConverters {
     // --------------------------------------------------------------------------------
 
     /**
-     * Runtime converter that converts objects of Flink Table & SQL internal data structures to
-     * corresponding Avro data structures.
+     * Runtime converter that converts objects of Paimon internal data structures to corresponding
+     * Avro data structures.
      */
     @FunctionalInterface
     public interface RowDataToAvroConverter extends Serializable {
@@ -67,7 +67,7 @@ public class RowDataToAvroConverters {
 
     /**
      * Creates a runtime converter according to the given logical type that converts objects of
-     * Flink Table & SQL internal data structures to corresponding Avro data structures.
+     * Paimon internal data structures to corresponding Avro data structures.
      */
     public static RowDataToAvroConverter createConverter(DataType type) {
         final RowDataToAvroConverter converter;
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
index f1979fa38..1dcc0da00 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.fs;
 
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.utils.IOUtils;
 
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -52,7 +53,7 @@ public class HadoopReadOnlyFileSystem extends FileSystem {
     @Override
     public FSDataInputStream open(Path path) throws IOException {
         return new FSDataInputStream(
-                new FSDataWrappedInputStream(fileIO.newInputStream(toFlinkPath(path))));
+                new FSDataWrappedInputStream(fileIO.newInputStream(toPaimonPath(path))));
     }
 
     @Override
@@ -62,10 +63,10 @@ public class HadoopReadOnlyFileSystem extends FileSystem {
 
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
-        return toHadoopStatus(fileIO.getFileStatus(toFlinkPath(path)));
+        return toHadoopStatus(fileIO.getFileStatus(toPaimonPath(path)));
     }
 
-    private static org.apache.paimon.fs.Path toFlinkPath(Path path) {
+    private static org.apache.paimon.fs.Path toPaimonPath(Path path) {
         return new org.apache.paimon.fs.Path(path.toUri());
     }
 
@@ -138,17 +139,13 @@ public class HadoopReadOnlyFileSystem extends FileSystem {
         throw new UnsupportedOperationException();
     }
 
-    /**
-     * A {@link InputStream} to wrap {@link org.apache.paimon.fs.SeekableInputStream} for Flink's
-     * input streams.
-     */
+    /** A {@link InputStream} to wrap {@link SeekableInputStream} for Paimon's input streams. */
     private static class FSDataWrappedInputStream extends InputStream
             implements Seekable, PositionedReadable {
 
-        private final org.apache.paimon.fs.SeekableInputStream seekableInputStream;
+        private final SeekableInputStream seekableInputStream;
 
-        private FSDataWrappedInputStream(
-                org.apache.paimon.fs.SeekableInputStream seekableInputStream) {
+        private FSDataWrappedInputStream(SeekableInputStream seekableInputStream) {
             this.seekableInputStream = seekableInputStream;
         }
 
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 7b3bfb6e9..5c99d4058 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -55,7 +55,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.types.DataTypeChecks.getFieldTypes;
 
-/** Orc {@link FileFormat}. The main code is copied from Flink {@code OrcFileFormatFactory}. */
+/** Orc {@link FileFormat}. */
 public class OrcFileFormat extends FileFormat {
 
     public static final String IDENTIFIER = "orc";
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 12de77b24..b5267fd7b 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -48,7 +48,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 
-import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createFlinkVector;
+import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
 import static org.apache.paimon.format.orc.reader.OrcSplitReaderUtil.toOrcType;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
@@ -123,10 +123,9 @@ public class OrcReaderFactory implements FormatReaderFactory {
         for (int i = 0; i < vectors.length; i++) {
             String name = tableFieldNames.get(selectedFields[i]);
             DataType type = tableFieldTypes.get(selectedFields[i]);
-            vectors[i] = createFlinkVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
+            vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
         }
-        VectorizedColumnBatch flinkColumnBatch = new VectorizedColumnBatch(vectors);
-        return new OrcReaderBatch(orcBatch, flinkColumnBatch, recycler);
+        return new OrcReaderBatch(orcBatch, new VectorizedColumnBatch(vectors), recycler);
     }
 
     // ------------------------------------------------------------------------
@@ -150,17 +149,18 @@ public class OrcReaderFactory implements FormatReaderFactory {
         private final VectorizedRowBatch orcVectorizedRowBatch;
         private final Pool.Recycler<OrcReaderBatch> recycler;
 
-        private final VectorizedColumnBatch flinkColumnBatch;
+        private final VectorizedColumnBatch paimonColumnBatch;
         private final ColumnarRowIterator result;
 
         protected OrcReaderBatch(
                 final VectorizedRowBatch orcVectorizedRowBatch,
-                final VectorizedColumnBatch flinkColumnBatch,
+                final VectorizedColumnBatch paimonColumnBatch,
                 final Pool.Recycler<OrcReaderBatch> recycler) {
             this.orcVectorizedRowBatch = checkNotNull(orcVectorizedRowBatch);
             this.recycler = checkNotNull(recycler);
-            this.flinkColumnBatch = flinkColumnBatch;
-            this.result = new ColumnarRowIterator(new ColumnarRow(flinkColumnBatch), this::recycle);
+            this.paimonColumnBatch = paimonColumnBatch;
+            this.result =
+                    new ColumnarRowIterator(new ColumnarRow(paimonColumnBatch), this::recycle);
         }
 
         /**
@@ -177,10 +177,10 @@ public class OrcReaderFactory implements FormatReaderFactory {
         }
 
         private RecordIterator<InternalRow> convertAndGetIterator(VectorizedRowBatch orcBatch) {
-            // no copying from the ORC column vectors to the Flink columns vectors necessary,
+            // no copying from the ORC column vectors to the Paimon columns vectors necessary,
             // because they point to the same data arrays internally design
             int batchSize = orcBatch.size;
-            flinkColumnBatch.setNumRows(batchSize);
+            paimonColumnBatch.setNumRows(batchSize);
             result.set(batchSize);
             return result;
         }
@@ -350,7 +350,7 @@ public class OrcReaderFactory implements FormatReaderFactory {
 
         OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
 
-        // configure filesystem from Flink filesystem
+        // configure filesystem from Paimon FileIO
         readerOptions.filesystem(new HadoopReadOnlyFileSystem(fileIO));
 
         return OrcFile.createReader(hPath, readerOptions);
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/ThreadLocalClassLoaderConfiguration.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/ThreadLocalClassLoaderConfiguration.java
index d03845896..4ce5229c4 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/ThreadLocalClassLoaderConfiguration.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/ThreadLocalClassLoaderConfiguration.java
@@ -24,8 +24,8 @@ import java.net.URL;
 /**
  * Workaround for https://issues.apache.org/jira/browse/ORC-653.
  *
- * <p>Since the conf is effectively cached across Flink jobs, at least force the thread local
- * classloader to avoid classloader leaks.
+ * <p>Since the conf is effectively cached across jobs, at least force the thread local classloader
+ * to avoid classloader leaks.
  */
 public final class ThreadLocalClassLoaderConfiguration extends Configuration {
     public ThreadLocalClassLoaderConfiguration() {}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
index c887c4135..21154c496 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 
-/** This column vector is used to adapt hive's ColumnVector to Flink's ColumnVector. */
+/** This column vector is used to adapt hive's ColumnVector to Paimon's ColumnVector. */
 public abstract class AbstractOrcColumnVector
         implements org.apache.paimon.data.columnar.ColumnVector {
 
@@ -49,7 +49,7 @@ public abstract class AbstractOrcColumnVector
         return !vector.noNulls && vector.isNull[vector.isRepeating ? 0 : i];
     }
 
-    public static org.apache.paimon.data.columnar.ColumnVector createFlinkVector(
+    public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector(
             ColumnVector vector, DataType dataType) {
         if (vector instanceof LongColumnVector) {
             if (dataType.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java
index df9944975..fd075417c 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java
@@ -25,23 +25,23 @@ import org.apache.paimon.types.ArrayType;
 
 import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
 
-/** This column vector is used to adapt hive's ListColumnVector to Flink's ArrayColumnVector. */
+/** This column vector is used to adapt hive's ListColumnVector to Paimon's ArrayColumnVector. */
 public class OrcArrayColumnVector extends AbstractOrcColumnVector
         implements org.apache.paimon.data.columnar.ArrayColumnVector {
 
     private final ListColumnVector hiveVector;
-    private final ColumnVector flinkVector;
+    private final ColumnVector paimonVector;
 
     public OrcArrayColumnVector(ListColumnVector hiveVector, ArrayType type) {
         super(hiveVector);
         this.hiveVector = hiveVector;
-        this.flinkVector = createFlinkVector(hiveVector.child, type.getElementType());
+        this.paimonVector = createPaimonVector(hiveVector.child, type.getElementType());
     }
 
     @Override
     public InternalArray getArray(int i) {
         long offset = hiveVector.offsets[i];
         long length = hiveVector.lengths[i];
-        return new ColumnarArray(flinkVector, (int) offset, (int) length);
+        return new ColumnarArray(paimonVector, (int) offset, (int) length);
     }
 }
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java
index 739f0d53a..d48bad886 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java
@@ -20,7 +20,7 @@ package org.apache.paimon.format.orc.reader;
 
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 
-/** This column vector is used to adapt hive's BytesColumnVector to Flink's BytesColumnVector. */
+/** This column vector is used to adapt hive's BytesColumnVector to Paimon's BytesColumnVector. */
 public class OrcBytesColumnVector extends AbstractOrcColumnVector
         implements org.apache.paimon.data.columnar.BytesColumnVector {
 
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java
index f71091b54..9ea4d763a 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import java.math.BigDecimal;
 
 /**
- * This column vector is used to adapt hive's DecimalColumnVector to Flink's DecimalColumnVector.
+ * This column vector is used to adapt hive's DecimalColumnVector to Paimon's DecimalColumnVector.
  */
 public class OrcDecimalColumnVector extends AbstractOrcColumnVector
         implements org.apache.paimon.data.columnar.DecimalColumnVector {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java
index ed8d60df9..0c0b0cc51 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.orc.reader;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 
 /**
- * This column vector is used to adapt hive's DoubleColumnVector to Flink's float and double
+ * This column vector is used to adapt hive's DoubleColumnVector to Paimon's float and double
  * ColumnVector.
  */
 public class OrcDoubleColumnVector extends AbstractOrcColumnVector
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java
index 3e49b20aa..e7dfe0e61 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.orc.reader;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
- * This column vector is used to adapt hive's LongColumnVector to Flink's boolean, byte, short, int
+ * This column vector is used to adapt hive's LongColumnVector to Paimon's boolean, byte, short, int
  * and long ColumnVector.
  */
 public class OrcLongColumnVector extends AbstractOrcColumnVector
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java
index dc937f32d..9b6025142 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java
@@ -25,25 +25,25 @@ import org.apache.paimon.types.MapType;
 
 import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 
-/** This column vector is used to adapt hive's MapColumnVector to Flink's MapColumnVector. */
+/** This column vector is used to adapt hive's MapColumnVector to Paimon's MapColumnVector. */
 public class OrcMapColumnVector extends AbstractOrcColumnVector
         implements org.apache.paimon.data.columnar.MapColumnVector {
 
     private final MapColumnVector hiveVector;
-    private final ColumnVector keyFlinkVector;
-    private final ColumnVector valueFlinkVector;
+    private final ColumnVector keyPaimonVector;
+    private final ColumnVector valuePaimonVector;
 
     public OrcMapColumnVector(MapColumnVector hiveVector, MapType type) {
         super(hiveVector);
         this.hiveVector = hiveVector;
-        this.keyFlinkVector = createFlinkVector(hiveVector.keys, type.getKeyType());
-        this.valueFlinkVector = createFlinkVector(hiveVector.values, type.getValueType());
+        this.keyPaimonVector = createPaimonVector(hiveVector.keys, type.getKeyType());
+        this.valuePaimonVector = createPaimonVector(hiveVector.values, type.getValueType());
     }
 
     @Override
     public InternalMap getMap(int i) {
         long offset = hiveVector.offsets[i];
         long length = hiveVector.lengths[i];
-        return new ColumnarMap(keyFlinkVector, valueFlinkVector, (int) offset, (int) length);
+        return new ColumnarMap(keyPaimonVector, valuePaimonVector, (int) offset, (int) length);
     }
 }
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
index f9c2388eb..698655e8a 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
@@ -34,11 +34,11 @@ public class OrcRowColumnVector extends AbstractOrcColumnVector
     public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) {
         super(hiveVector);
         int len = hiveVector.fields.length;
-        ColumnVector[] flinkVectors = new ColumnVector[len];
+        ColumnVector[] paimonVectors = new ColumnVector[len];
         for (int i = 0; i < len; i++) {
-            flinkVectors[i] = createFlinkVector(hiveVector.fields[i], type.getTypeAt(i));
+            paimonVectors[i] = createPaimonVector(hiveVector.fields[i], type.getTypeAt(i));
         }
-        this.columnarRow = new ColumnarRow(new VectorizedColumnBatch(flinkVectors));
+        this.columnarRow = new ColumnarRow(new VectorizedColumnBatch(paimonVectors));
     }
 
     @Override
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java
index 48a692ee8..85d2cfc7c 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcSplitReaderUtil.java
@@ -32,7 +32,6 @@ import org.apache.orc.TypeDescription;
 /** Util for orc types. */
 public class OrcSplitReaderUtil {
 
-    /** See {@code org.apache.flink.table.catalog.hive.util.HiveTypeUtil}. */
     public static TypeDescription toOrcType(DataType type) {
         type = type.copy(true);
         switch (type.getTypeRoot()) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
index 70c5ab58c..1da7232fc 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 
 /**
- * This column vector is used to adapt hive's TimestampColumnVector to Flink's
+ * This column vector is used to adapt hive's TimestampColumnVector to Paimon's
  * TimestampColumnVector.
  */
 public class OrcTimestampColumnVector extends AbstractOrcColumnVector
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
index f915b469c..20112604e 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
@@ -47,7 +47,7 @@ import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
  * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
  *
  * <p>Whereas PhysicalFsWriter implementation works on the basis of a Path, this implementation
- * leverages Flink's {@link PositionOutputStream} to write the compressed data.
+ * leverages Paimon's {@link PositionOutputStream} to write the compressed data.
  *
  * <p>NOTE: If the ORC dependency version is updated, this file may have to be updated as well to be
  * in sync with the new version's PhysicalFsWriter.
@@ -222,7 +222,7 @@ public class PhysicalWriterImpl implements PhysicalWriter {
     @Override
     public void close() {
         // Just release the codec but don't close the internal stream here to avoid
-        // Stream Closed or ClosedChannelException when Flink performs checkpoint.
+        // Stream Closed or ClosedChannelException when performs checkpoint.
         OrcCodecPool.returnCodec(compress, codec);
         codec = null;
     }
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 854905358..24ed57c33 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -151,7 +151,7 @@ public class ParquetReaderFactory implements FormatReaderFactory {
             }
         }
 
-        return Types.buildMessage().addFields(types).named("flink-parquet");
+        return Types.buildMessage().addFields(types).named("paimon-parquet");
     }
 
     private void checkSchema(MessageType fileSchema, MessageType requestedSchema)
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index a8fbb6704..4ebc3e1bf 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -44,7 +44,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LE
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
 
-/** Schema converter converts Parquet schema to and from Flink internal types. */
+/** Schema converter converts Parquet schema to and from Paimon internal types. */
 public class ParquetSchemaConverter {
 
     static final String MAP_REPEATED_NAME = "key_value";
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
index fb6f302b4..e91f9096e 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
@@ -31,10 +31,7 @@ import org.apache.parquet.io.OutputFile;
 
 import java.io.IOException;
 
-/**
- * A factory that creates a Parquet {@link FormatWriter}. The factory takes a user-supplied builder
- * to assemble Parquet's writer and then turns it into a Flink {@code BulkWriter}.
- */
+/** A factory that creates a Parquet {@link FormatWriter}. */
 public class ParquetWriterFactory implements FormatWriterFactory {
 
     /** The builder to construct the ParquetWriter. */
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java
index a88a692dc..057cc7c32 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java
@@ -55,7 +55,7 @@ public class ParquetRowDataBuilder
 
     private class ParquetWriteSupport extends WriteSupport<InternalRow> {
 
-        private final MessageType schema = convertToParquetMessageType("flink_schema", rowType);
+        private final MessageType schema = convertToParquetMessageType("paimon_schema", rowType);
 
         private ParquetRowDataWriter writer;
 
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/PositionOutputStreamAdapter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/PositionOutputStreamAdapter.java
index a4558b10a..72fda99e4 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/PositionOutputStreamAdapter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/PositionOutputStreamAdapter.java
@@ -24,16 +24,16 @@ import java.io.IOException;
 
 import static org.apache.parquet.Preconditions.checkNotNull;
 
-/** An adapter to turn Flink's {@link PositionOutputStream} into a {@link PositionOutputStream}. */
+/** An adapter to turn Paimon's {@link PositionOutputStream} into a {@link PositionOutputStream}. */
 class PositionOutputStreamAdapter extends PositionOutputStream {
 
-    /** The Flink stream written to. */
+    /** The Paimon stream written to. */
     private final org.apache.paimon.fs.PositionOutputStream out;
 
     /**
      * Create a new PositionOutputStreamAdapter.
      *
-     * @param out The Flink stream written to.
+     * @param out The Paimon stream written to.
      */
     PositionOutputStreamAdapter(org.apache.paimon.fs.PositionOutputStream out) {
         this.out = checkNotNull(out, "out");
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/StreamOutputFile.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/StreamOutputFile.java
index 975f646e2..b7ab0bc52 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/StreamOutputFile.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/StreamOutputFile.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
- * An implementation of Parquet's {@link OutputFile} interface that goes against a Flink {@link
+ * An implementation of Parquet's {@link OutputFile} interface that goes against a Paimon {@link
  * PositionOutputStream}.
  *
  * <p>Because the implementation goes against an open stream, rather than open its own streams
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
index 389611de1..054a3369b 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
@@ -56,7 +56,7 @@ class AvroBulkFormatTest {
                             BinaryString.fromString("AvroBulk"),
                             BinaryString.fromString("FormatTest")),
                     GenericRow.of(
-                            BinaryString.fromString("Apache"), BinaryString.fromString("Flink")),
+                            BinaryString.fromString("Apache"), BinaryString.fromString("Paimon")),
                     GenericRow.of(
                             BinaryString.fromString(
                                     "永和九年,岁在癸丑,暮春之初,会于会稽山阴之兰亭,修禊事也。群贤毕至,少"
@@ -77,7 +77,7 @@ class AvroBulkFormatTest {
                             BinaryString.fromString("only one record"))
                     // -------- file length 752 --------
                     );
-    private static final List<Long> BLOCK_STARTS = Arrays.asList(232L, 593L, 705L);
+    private static final List<Long> BLOCK_STARTS = Arrays.asList(233L, 595L, 707L);
 
     private File tmpFile;
 
diff --git a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
index dec145e6f..b33168e25 100644
--- a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
+++ b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
@@ -28,7 +28,7 @@ import org.apache.paimon.types.VarCharType;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
-/** Utils for converting types related classes between Flink and Hive. */
+/** Utils for converting types related classes between Paimon and Hive. */
 public class HiveTypeUtils {
 
     public static TypeInfo logicalTypeToTypeInfo(DataType logicalType) {
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index 5c0f54eff..a50eca546 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -74,7 +74,7 @@ public class HiveSchema {
             throw new UnsupportedOperationException(
                     "Location property is missing for table "
                             + tableName
-                            + ". Currently Flink paimon only supports external table for Hive "
+                            + ". Currently Paimon only supports external table for Hive "
                             + "so location property must be set.");
         }
         Path path = new Path(location);
@@ -152,7 +152,7 @@ public class HiveSchema {
             throw new IllegalArgumentException(
                     "Hive DDL and paimon schema mismatched! "
                             + "It is recommended not to write any column definition "
-                            + "as Flink paimon external table can read schema from the specified location.\n"
+                            + "as Paimon external table can read schema from the specified location.\n"
                             + "Mismatched fields are:\n"
                             + String.join("--------------------\n", mismatched));
         }
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
index 703595096..6259fcb71 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
@@ -36,7 +36,7 @@ public class PaimonOutputFormat implements OutputFormat<InternalRow, InternalRow
             FileSystem fileSystem, JobConf jobConf, String s, Progressable progressable)
             throws IOException {
         throw new UnsupportedOperationException(
-                "Flink paimon currently can only be used as an input format for Hive.");
+                "Paimon currently can only be used as an input format for Hive.");
     }
 
     @Override
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
index 56e880c4c..f11ef4990 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
@@ -37,7 +37,7 @@ public class PaimonDateObjectInspector extends AbstractPrimitiveJavaObjectInspec
 
     @Override
     public Date getPrimitiveJavaObject(Object o) {
-        // Flink stores date as an integer (epoch day, 1970-01-01 = day 0)
+        // Paimon stores date as an integer (epoch day, 1970-01-01 = day 0)
         // while constructor of Date accepts epoch millis
         return o == null ? null : DateTimeUtils.toSQLDate((Integer) o);
     }
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
index c3e9e1db3..3e9a63620 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
@@ -116,7 +116,7 @@ public class HiveTableSchemaTest {
                         "\n",
                         "Hive DDL and paimon schema mismatched! "
                                 + "It is recommended not to write any column definition "
-                                + "as Flink paimon external table can read schema from the specified location.",
+                                + "as Paimon external table can read schema from the specified location.",
                         "Mismatched fields are:",
                         "Field #1",
                         "Hive DDL          : mismatched string",
@@ -145,7 +145,7 @@ public class HiveTableSchemaTest {
                         "\n",
                         "Hive DDL and paimon schema mismatched! "
                                 + "It is recommended not to write any column definition "
-                                + "as Flink paimon external table can read schema from the specified location.",
+                                + "as Paimon external table can read schema from the specified location.",
                         "Mismatched fields are:",
                         "Field #1",
                         "Hive DDL          : null",
@@ -183,7 +183,7 @@ public class HiveTableSchemaTest {
                         "\n",
                         "Hive DDL and paimon schema mismatched! "
                                 + "It is recommended not to write any column definition "
-                                + "as Flink paimon external table can read schema from the specified location.",
+                                + "as Paimon external table can read schema from the specified location.",
                         "Mismatched fields are:",
                         "Field #3",
                         "Hive DDL          : d int",
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index 7cf939350..04c8a1d68 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -777,7 +777,7 @@ public class PaimonStorageHandlerITCase {
                         Collections.emptyList(),
                         Collections.emptyList());
 
-        // TODO add NaN related tests after FLINK-27627 and FLINK-27628 are fixed
+        // TODO add NaN related tests
 
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
diff --git a/paimon-spark/paimon-spark-2/src/main/java/org/apache/paimon/spark/SparkDataSourceReader.java b/paimon-spark/paimon-spark-2/src/main/java/org/apache/paimon/spark/SparkDataSourceReader.java
index 6d35dfdfd..388b0c688 100644
--- a/paimon-spark/paimon-spark-2/src/main/java/org/apache/paimon/spark/SparkDataSourceReader.java
+++ b/paimon-spark/paimon-spark-2/src/main/java/org/apache/paimon/spark/SparkDataSourceReader.java
@@ -117,7 +117,7 @@ public class SparkDataSourceReader
     @Override
     public StructType readSchema() {
         RowType rowType = table.rowType();
-        return SparkTypeUtils.fromFlinkRowType(
+        return SparkTypeUtils.fromPaimonRowType(
                 projectedFields == null ? rowType : TypeUtils.project(rowType, projectedFields));
     }
 
diff --git a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
index 26a89c4ec..8d9ac48f7 100644
--- a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
+++ b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
@@ -84,7 +84,7 @@ public class SparkInternalRowTest {
 
         Function1<Object, Object> sparkConverter =
                 CatalystTypeConverters.createToScalaConverter(
-                        SparkTypeUtils.fromFlinkType(ALL_TYPES));
+                        SparkTypeUtils.fromPaimonType(ALL_TYPES));
         org.apache.spark.sql.Row sparkRow =
                 (org.apache.spark.sql.Row)
                         sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(rowData));
diff --git a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 784957610..b374966a8 100644
--- a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -62,7 +62,7 @@ public class SparkReadITCase {
         Path warehousePath = new Path("file:" + warehouse);
         spark = SparkSession.builder().master("local[2]").getOrCreate();
 
-        // Flink sink
+        // Paimon sink
         tablePath1 = new Path(warehousePath, "default.db/t1");
         SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
         testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
diff --git a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index 7220d2246..f5cbf8db9 100644
--- a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++ b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -24,7 +24,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Test;
 
-import static org.apache.paimon.spark.SparkTypeUtils.fromFlinkRowType;
+import static org.apache.paimon.spark.SparkTypeUtils.fromPaimonRowType;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link SparkTypeUtils}. */
@@ -90,7 +90,7 @@ public class SparkTypeTest {
                         + "StructField(decimal2,DecimalType(38,2),true), "
                         + "StructField(decimal3,DecimalType(10,1),true))";
 
-        StructType sparkType = fromFlinkRowType(ALL_TYPES);
+        StructType sparkType = fromPaimonRowType(ALL_TYPES);
         assertThat(sparkType.toString()).isEqualTo(expected);
     }
 }
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java
index 654661202..c99f857c9 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkArrayData.java
@@ -33,11 +33,11 @@ import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
-import static org.apache.paimon.spark.SparkInternalRow.fromFlink;
+import static org.apache.paimon.spark.SparkInternalRow.fromPaimon;
 import static org.apache.paimon.utils.RowDataUtils.copyArray;
 import static org.apache.paimon.utils.TypeUtils.timestampPrecision;
 
-/** Spark {@link ArrayData} to wrap flink {@code ArrayData}. */
+/** Spark {@link ArrayData} to wrap Paimon {@link InternalArray}. */
 public class SparkArrayData extends ArrayData {
 
     private final DataType elementType;
@@ -67,7 +67,7 @@ public class SparkArrayData extends ArrayData {
     public Object[] array() {
         Object[] objects = new Object[numElements()];
         for (int i = 0; i < objects.length; i++) {
-            objects[i] = fromFlink(RowDataUtils.get(array, i, elementType), elementType);
+            objects[i] = fromPaimon(RowDataUtils.get(array, i, elementType), elementType);
         }
         return objects;
     }
@@ -117,7 +117,7 @@ public class SparkArrayData extends ArrayData {
     }
 
     private long getTimestampMicros(int ordinal) {
-        return fromFlink(array.getTimestamp(ordinal, timestampPrecision(elementType)));
+        return fromPaimon(array.getTimestamp(ordinal, timestampPrecision(elementType)));
     }
 
     @Override
@@ -132,12 +132,12 @@ public class SparkArrayData extends ArrayData {
 
     @Override
     public Decimal getDecimal(int ordinal, int precision, int scale) {
-        return fromFlink(array.getDecimal(ordinal, precision, scale));
+        return fromPaimon(array.getDecimal(ordinal, precision, scale));
     }
 
     @Override
     public UTF8String getUTF8String(int ordinal) {
-        return fromFlink(array.getString(ordinal));
+        return fromPaimon(array.getString(ordinal));
     }
 
     @Override
@@ -152,17 +152,17 @@ public class SparkArrayData extends ArrayData {
 
     @Override
     public InternalRow getStruct(int ordinal, int numFields) {
-        return fromFlink(array.getRow(ordinal, numFields), (RowType) elementType);
+        return fromPaimon(array.getRow(ordinal, numFields), (RowType) elementType);
     }
 
     @Override
     public ArrayData getArray(int ordinal) {
-        return fromFlink(array.getArray(ordinal), (ArrayType) elementType);
+        return fromPaimon(array.getArray(ordinal), (ArrayType) elementType);
     }
 
     @Override
     public MapData getMap(int ordinal) {
-        return fromFlink(array.getMap(ordinal), elementType);
+        return fromPaimon(array.getMap(ordinal), elementType);
     }
 
     @Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 12e91ca97..9196bfef3 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -61,7 +61,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.spark.SparkTypeUtils.toFlinkType;
+import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 
 /** Spark {@link TableCatalog} for paimon. */
 public class SparkCatalog implements TableCatalog, SupportsNamespaces {
@@ -269,7 +269,7 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
             SchemaChange.Move move = getMove(add.position(), add.fieldNames());
             return SchemaChange.addColumn(
                     add.fieldNames()[0],
-                    toFlinkType(add.dataType()).copy(add.isNullable()),
+                    toPaimonType(add.dataType()).copy(add.isNullable()),
                     add.comment(),
                     move);
         } else if (change instanceof RenameColumn) {
@@ -284,7 +284,7 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
             UpdateColumnType update = (UpdateColumnType) change;
             validateAlterNestedField(update.fieldNames());
             return SchemaChange.updateColumnType(
-                    update.fieldNames()[0], toFlinkType(update.newDataType()));
+                    update.fieldNames()[0], toPaimonType(update.newDataType()));
         } else if (change instanceof UpdateColumnNullability) {
             UpdateColumnNullability update = (UpdateColumnNullability) change;
             return SchemaChange.updateColumnNullability(update.fieldNames(), update.nullable());
@@ -334,7 +334,7 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
                                 .map(String::trim)
                                 .collect(Collectors.toList());
         return new Schema(
-                ((RowType) toFlinkType(schema)).getFields(),
+                ((RowType) toPaimonType(schema)).getFields(),
                 Arrays.stream(partitions)
                         .map(partition -> partition.references()[0].describe())
                         .collect(Collectors.toList()),
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
index c0caf6706..f330435a6 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRow.java
@@ -114,7 +114,7 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
 
     private long getTimestampMicros(int ordinal) {
         DataType type = rowType.getTypeAt(ordinal);
-        return fromFlink(row.getTimestamp(ordinal, timestampPrecision(type)));
+        return fromPaimon(row.getTimestamp(ordinal, timestampPrecision(type)));
     }
 
     @Override
@@ -130,12 +130,12 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
     @Override
     public Decimal getDecimal(int ordinal, int precision, int scale) {
         org.apache.paimon.data.Decimal decimal = row.getDecimal(ordinal, precision, scale);
-        return fromFlink(decimal);
+        return fromPaimon(decimal);
     }
 
     @Override
     public UTF8String getUTF8String(int ordinal) {
-        return fromFlink(row.getString(ordinal));
+        return fromPaimon(row.getString(ordinal));
     }
 
     @Override
@@ -150,17 +150,17 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
 
     @Override
     public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) {
-        return fromFlink(row.getRow(ordinal, numFields), (RowType) rowType.getTypeAt(ordinal));
+        return fromPaimon(row.getRow(ordinal, numFields), (RowType) rowType.getTypeAt(ordinal));
     }
 
     @Override
     public ArrayData getArray(int ordinal) {
-        return fromFlink(row.getArray(ordinal), (ArrayType) rowType.getTypeAt(ordinal));
+        return fromPaimon(row.getArray(ordinal), (ArrayType) rowType.getTypeAt(ordinal));
     }
 
     @Override
     public MapData getMap(int ordinal) {
-        return fromFlink(row.getMap(ordinal), rowType.getTypeAt(ordinal));
+        return fromPaimon(row.getMap(ordinal), rowType.getTypeAt(ordinal));
     }
 
     @Override
@@ -168,57 +168,57 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
         return SpecializedGettersReader.read(this, ordinal, dataType);
     }
 
-    public static Object fromFlink(Object o, DataType type) {
+    public static Object fromPaimon(Object o, DataType type) {
         if (o == null) {
             return null;
         }
         switch (type.getTypeRoot()) {
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return fromFlink((Timestamp) o);
+                return fromPaimon((Timestamp) o);
             case CHAR:
             case VARCHAR:
-                return fromFlink((BinaryString) o);
+                return fromPaimon((BinaryString) o);
             case DECIMAL:
-                return fromFlink((org.apache.paimon.data.Decimal) o);
+                return fromPaimon((org.apache.paimon.data.Decimal) o);
             case ARRAY:
-                return fromFlink((InternalArray) o, (ArrayType) type);
+                return fromPaimon((InternalArray) o, (ArrayType) type);
             case MAP:
             case MULTISET:
-                return fromFlink((InternalMap) o, type);
+                return fromPaimon((InternalMap) o, type);
             case ROW:
-                return fromFlink((InternalRow) o, (RowType) type);
+                return fromPaimon((InternalRow) o, (RowType) type);
             default:
                 return o;
         }
     }
 
-    public static UTF8String fromFlink(BinaryString string) {
+    public static UTF8String fromPaimon(BinaryString string) {
         return UTF8String.fromBytes(string.toBytes());
     }
 
-    public static Decimal fromFlink(org.apache.paimon.data.Decimal decimal) {
+    public static Decimal fromPaimon(org.apache.paimon.data.Decimal decimal) {
         return Decimal.apply(decimal.toBigDecimal());
     }
 
-    public static org.apache.spark.sql.catalyst.InternalRow fromFlink(
+    public static org.apache.spark.sql.catalyst.InternalRow fromPaimon(
             InternalRow row, RowType rowType) {
         return new SparkInternalRow(rowType).replace(row);
     }
 
-    public static long fromFlink(Timestamp timestamp) {
+    public static long fromPaimon(Timestamp timestamp) {
         return DateTimeUtils.fromJavaTimestamp(timestamp.toSQLTimestamp());
     }
 
-    public static ArrayData fromFlink(InternalArray array, ArrayType arrayType) {
-        return fromFlinkArrayElementType(array, arrayType.getElementType());
+    public static ArrayData fromPaimon(InternalArray array, ArrayType arrayType) {
+        return fromPaimonArrayElementType(array, arrayType.getElementType());
     }
 
-    private static ArrayData fromFlinkArrayElementType(InternalArray array, DataType elementType) {
+    private static ArrayData fromPaimonArrayElementType(InternalArray array, DataType elementType) {
         return new SparkArrayData(elementType).replace(array);
     }
 
-    public static MapData fromFlink(InternalMap map, DataType mapType) {
+    public static MapData fromPaimon(InternalMap map, DataType mapType) {
         DataType keyType;
         DataType valueType;
         if (mapType instanceof MapType) {
@@ -232,7 +232,7 @@ public class SparkInternalRow extends org.apache.spark.sql.catalyst.InternalRow
         }
 
         return new ArrayBasedMapData(
-                fromFlinkArrayElementType(map.keyArray(), keyType),
-                fromFlinkArrayElementType(map.valueArray(), valueType));
+                fromPaimonArrayElementType(map.keyArray(), keyType),
+                fromPaimonArrayElementType(map.valueArray(), valueType));
     }
 }
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index efd8f3ae5..1c4b1567c 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -94,7 +94,7 @@ public class SparkRow implements InternalRow {
     @Override
     public int getInt(int i) {
         if (type.getTypeAt(i) instanceof DateType) {
-            return toFlinkDate(row.get(i));
+            return toPaimonDate(row.get(i));
         }
         return row.getInt(i);
     }
@@ -126,7 +126,7 @@ public class SparkRow implements InternalRow {
 
     @Override
     public Timestamp getTimestamp(int i, int precision) {
-        return toFlinkTimestamp(row.get(i));
+        return toPaimonTimestamp(row.get(i));
     }
 
     @Override
@@ -136,12 +136,12 @@ public class SparkRow implements InternalRow {
 
     @Override
     public InternalArray getArray(int i) {
-        return new FlinkArray(((ArrayType) type.getTypeAt(i)).getElementType(), row.getList(i));
+        return new PaimonArray(((ArrayType) type.getTypeAt(i)).getElementType(), row.getList(i));
     }
 
     @Override
     public InternalMap getMap(int i) {
-        return toFlinkMap((MapType) type.getTypeAt(i), row.getJavaMap(i));
+        return toPaimonMap((MapType) type.getTypeAt(i), row.getJavaMap(i));
     }
 
     @Override
@@ -149,7 +149,7 @@ public class SparkRow implements InternalRow {
         return new SparkRow((RowType) type.getTypeAt(i), row.getStruct(i));
     }
 
-    private static int toFlinkDate(Object object) {
+    private static int toPaimonDate(Object object) {
         if (object instanceof Date) {
             return DateTimeUtils.toInternal((Date) object);
         } else {
@@ -157,7 +157,7 @@ public class SparkRow implements InternalRow {
         }
     }
 
-    private static Timestamp toFlinkTimestamp(Object object) {
+    private static Timestamp toPaimonTimestamp(Object object) {
         if (object instanceof java.sql.Timestamp) {
             return Timestamp.fromSQLTimestamp((java.sql.Timestamp) object);
         } else {
@@ -165,7 +165,7 @@ public class SparkRow implements InternalRow {
         }
     }
 
-    private static InternalMap toFlinkMap(MapType mapType, Map<Object, Object> map) {
+    private static InternalMap toPaimonMap(MapType mapType, Map<Object, Object> map) {
         List<Object> keys = new ArrayList<>();
         List<Object> values = new ArrayList<>();
         map.forEach(
@@ -174,8 +174,8 @@ public class SparkRow implements InternalRow {
                     values.add(v);
                 });
 
-        FlinkArray key = new FlinkArray(mapType.getKeyType(), keys);
-        FlinkArray value = new FlinkArray(mapType.getValueType(), values);
+        PaimonArray key = new PaimonArray(mapType.getKeyType(), keys);
+        PaimonArray value = new PaimonArray(mapType.getValueType(), values);
         return new InternalMap() {
             @Override
             public int size() {
@@ -194,12 +194,12 @@ public class SparkRow implements InternalRow {
         };
     }
 
-    private static class FlinkArray implements InternalArray {
+    private static class PaimonArray implements InternalArray {
 
         private final DataType elementType;
         private final List<Object> list;
 
-        private FlinkArray(DataType elementType, List<Object> list) {
+        private PaimonArray(DataType elementType, List<Object> list) {
             this.list = list;
             this.elementType = elementType;
         }
@@ -237,7 +237,7 @@ public class SparkRow implements InternalRow {
         @Override
         public int getInt(int i) {
             if (elementType instanceof DateType) {
-                return toFlinkDate(getAs(i));
+                return toPaimonDate(getAs(i));
             }
             return getAs(i);
         }
@@ -269,7 +269,7 @@ public class SparkRow implements InternalRow {
 
         @Override
         public Timestamp getTimestamp(int i, int precision) {
-            return toFlinkTimestamp(getAs(i));
+            return toPaimonTimestamp(getAs(i));
         }
 
         @Override
@@ -279,12 +279,12 @@ public class SparkRow implements InternalRow {
 
         @Override
         public InternalArray getArray(int i) {
-            return new FlinkArray(((ArrayType) elementType).getElementType(), getAs(i));
+            return new PaimonArray(((ArrayType) elementType).getElementType(), getAs(i));
         }
 
         @Override
         public InternalMap getMap(int i) {
-            return toFlinkMap((MapType) elementType, getAs(i));
+            return toPaimonMap((MapType) elementType, getAs(i));
         }
 
         @Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index 5a2c3c691..d05081fa0 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -55,7 +55,7 @@ public class SparkScan implements Scan, SupportsReportStatistics {
 
     @Override
     public StructType readSchema() {
-        return SparkTypeUtils.fromFlinkRowType(readBuilder.readType());
+        return SparkTypeUtils.fromPaimonRowType(readBuilder.readType());
     }
 
     @Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index cd1c9cb40..c462eb56d 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -74,7 +74,7 @@ public class SparkTable
 
     @Override
     public StructType schema() {
-        return SparkTypeUtils.fromFlinkRowType(table.rowType());
+        return SparkTypeUtils.fromPaimonRowType(table.rowType());
     }
 
     @Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
index ae4456f8a..75d39c22c 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
@@ -57,21 +57,21 @@ public class SparkTypeUtils {
 
     private SparkTypeUtils() {}
 
-    public static StructType fromFlinkRowType(RowType type) {
-        return (StructType) fromFlinkType(type);
+    public static StructType fromPaimonRowType(RowType type) {
+        return (StructType) fromPaimonType(type);
     }
 
-    public static DataType fromFlinkType(org.apache.paimon.types.DataType type) {
-        return type.accept(FlinkToSparkTypeVisitor.INSTANCE);
+    public static DataType fromPaimonType(org.apache.paimon.types.DataType type) {
+        return type.accept(PaimonToSparkTypeVisitor.INSTANCE);
     }
 
-    public static org.apache.paimon.types.DataType toFlinkType(DataType dataType) {
-        return SparkToFlinkTypeVisitor.visit(dataType);
+    public static org.apache.paimon.types.DataType toPaimonType(DataType dataType) {
+        return SparkToPaimonTypeVisitor.visit(dataType);
     }
 
-    private static class FlinkToSparkTypeVisitor extends DataTypeDefaultVisitor<DataType> {
+    private static class PaimonToSparkTypeVisitor extends DataTypeDefaultVisitor<DataType> {
 
-        private static final FlinkToSparkTypeVisitor INSTANCE = new FlinkToSparkTypeVisitor();
+        private static final PaimonToSparkTypeVisitor INSTANCE = new PaimonToSparkTypeVisitor();
 
         @Override
         public DataType visit(CharType charType) {
@@ -190,16 +190,16 @@ public class SparkTypeUtils {
         }
     }
 
-    private static class SparkToFlinkTypeVisitor {
+    private static class SparkToPaimonTypeVisitor {
 
         private final AtomicInteger currentIndex = new AtomicInteger(0);
 
         static org.apache.paimon.types.DataType visit(DataType type) {
-            return visit(type, new SparkToFlinkTypeVisitor());
+            return visit(type, new SparkToPaimonTypeVisitor());
         }
 
         static org.apache.paimon.types.DataType visit(
-                DataType type, SparkToFlinkTypeVisitor visitor) {
+                DataType type, SparkToPaimonTypeVisitor visitor) {
             if (type instanceof StructType) {
                 StructField[] fields = ((StructType) type).fields();
                 List<org.apache.paimon.types.DataType> fieldResults =
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/MinioTestContainer.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/MinioTestContainer.java
index b3c331f4b..6adafded3 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/MinioTestContainer.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/MinioTestContainer.java
@@ -43,7 +43,7 @@ import java.util.Map;
 public class MinioTestContainer extends GenericContainer<MinioTestContainer>
         implements BeforeAllCallback, AfterAllCallback {
 
-    private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+    private static final String PAIMON_CONFIG_S3_ENDPOINT = "s3.endpoint";
 
     private static final int DEFAULT_PORT = 9000;
 
@@ -110,7 +110,7 @@ public class MinioTestContainer extends GenericContainer<MinioTestContainer>
 
     public Map<String, String> getS3ConfigOptions() {
         Map<String, String> config = new HashMap<>();
-        config.put(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+        config.put(PAIMON_CONFIG_S3_ENDPOINT, getHttpEndpoint());
         config.put("s3.path.style.access", "true");
         config.put("s3.access.key", accessKey);
         config.put("s3.secret.key", secretKey);
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
index 866ab1e98..7f11b6d5d 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkInternalRowTest.java
@@ -81,7 +81,7 @@ public class SparkInternalRowTest {
 
         Function1<Object, Object> sparkConverter =
                 CatalystTypeConverters.createToScalaConverter(
-                        SparkTypeUtils.fromFlinkType(ALL_TYPES));
+                        SparkTypeUtils.fromPaimonType(ALL_TYPES));
         org.apache.spark.sql.Row sparkRow =
                 (org.apache.spark.sql.Row)
                         sparkConverter.apply(new SparkInternalRow(ALL_TYPES).replace(rowData));
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 0d539f900..36454f6f2 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -79,7 +79,7 @@ public abstract class SparkReadTestBase {
     @BeforeEach
     public void beforeEach() throws Exception {
 
-        // flink sink
+        // Paimon sink
         tablePath1 = new Path(warehousePath, "default.db/t1");
         createTable("t1");
         writeTable(
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index 1df0e93d2..d72c4415f 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -26,8 +26,8 @@ import org.junit.jupiter.api.Test;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.paimon.spark.SparkTypeUtils.fromFlinkRowType;
-import static org.apache.paimon.spark.SparkTypeUtils.toFlinkType;
+import static org.apache.paimon.spark.SparkTypeUtils.fromPaimonRowType;
+import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link SparkTypeUtils}. */
@@ -96,9 +96,9 @@ public class SparkTypeTest {
                         + "StructField(decimal2,DecimalType(38,2),true),"
                         + "StructField(decimal3,DecimalType(10,1),true))";
 
-        StructType sparkType = fromFlinkRowType(ALL_TYPES);
+        StructType sparkType = fromPaimonRowType(ALL_TYPES);
         assertThat(sparkType.toString().replace(", ", ",")).isEqualTo(expected);
 
-        assertThat(toFlinkType(sparkType)).isEqualTo(ALL_TYPES);
+        assertThat(toPaimonType(sparkType)).isEqualTo(ALL_TYPES);
     }
 }
diff --git a/paimon-spark/pom.xml b/paimon-spark/pom.xml
index a618cbfce..cbc4594fc 100644
--- a/paimon-spark/pom.xml
+++ b/paimon-spark/pom.xml
@@ -46,8 +46,6 @@ under the License.
     </modules>
 
     <dependencies>
-        <!-- Flink All dependencies -->
-
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-shade</artifactId>