You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/09/15 11:47:07 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2] Refactor hdfs file sink connector code structure (#2701)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6129c0256 [Improve][Connector-V2] Refactor hdfs file sink connector code structure (#2701)
6129c0256 is described below

commit 6129c025678ec25ee155d89084f52d324b9f364a
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Thu Sep 15 19:47:01 2022 +0800

    [Improve][Connector-V2] Refactor hdfs file sink connector code structure (#2701)
    
    * [Improve][Connector-V2] Refactor hdfs file sink codes
---
 docs/en/connector-v2/sink/HdfsFile.md              |  44 ++--
 .../seatunnel/file/hdfs/sink/HdfsFileSink.java     |  30 ++-
 .../file/hdfs/sink/HdfsFileSinkPlugin.java         |  69 ------
 .../file/hdfs/sink/filesystem/HdfsFileSystem.java  |  40 ----
 .../sink/filesystem/HdfsFileSystemCommitter.java   |  54 -----
 .../seatunnel/file/hdfs/sink/util/HdfsUtils.java   | 138 ------------
 .../writer/HdfsJsonTransactionStateFileWriter.java | 122 -----------
 .../writer/HdfsOrcTransactionStateFileWriter.java  | 244 ---------------------
 .../HdfsParquetTransactionStateFileWriter.java     | 169 --------------
 .../HdfsTransactionStateFileWriteFactory.java      | 115 ----------
 .../writer/HdfsTxtTransactionStateFileWriter.java  | 125 -----------
 .../hdfs/sink/FileSinkAggregatedCommitterTest.java | 147 -------------
 .../TestHdfsTxtTransactionStateFileWriter.java     | 109 ---------
 .../e2e/flink/v2/file/FakeSourceToFileIT.java      |  27 ---
 .../resources/file/fakesource_to_hdfs_json.conf    |  70 ------
 .../resources/file/fakesource_to_hdfs_parquet.conf |  71 ------
 .../resources/file/fakesource_to_hdfs_text.conf    |  71 ------
 .../e2e/spark/v2/file/FakeSourceToFileIT.java      |  28 ---
 .../resources/file/fakesource_to_hdfs_json.conf    |  69 ------
 .../resources/file/fakesource_to_hdfs_parquet.conf |  70 ------
 .../resources/file/fakesource_to_hdfs_text.conf    |  70 ------
 21 files changed, 51 insertions(+), 1831 deletions(-)

diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index e2e3b7561..928156760 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -24,24 +24,29 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
 
-| name                              | type   | required | default value                                           |
-| --------------------------------- | ------ | -------- |---------------------------------------------------------|
-| path                              | string | yes      | -                                                       |
-| file_name_expression              | string | no       | "${transactionId}"                                      |
-| file_format                       | string | no       | "text"                                                  |
-| filename_time_format              | string | no       | "yyyy.MM.dd"                                            |
-| field_delimiter                   | string | no       | '\001'                                                  |
-| row_delimiter                     | string | no       | "\n"                                                    |
-| partition_by                      | array  | no       | -                                                       |
-| partition_dir_expression          | string | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"              |
-| is_partition_field_write_in_file  | boolean| no       | false                                                   |
-| sink_columns                      | array  | no       | When this parameter is empty, all fields are sink columns |
-| is_enable_transaction             | boolean| no       | true                                                    |
-| save_mode                         | string | no       | "error"                                                 |
+| name                             | type   | required | default value                                           |
+|----------------------------------| ------ | -------- |---------------------------------------------------------|
+| fs.defaultFS                     | string | yes      | -                                                       |
+| path                             | string | yes      | -                                                       |
+| file_name_expression             | string | no       | "${transactionId}"                                      |
+| file_format                      | string | no       | "text"                                                  |
+| filename_time_format             | string | no       | "yyyy.MM.dd"                                            |
+| field_delimiter                  | string | no       | '\001'                                                  |
+| row_delimiter                    | string | no       | "\n"                                                    |
+| partition_by                     | array  | no       | -                                                       |
+| partition_dir_expression         | string | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"              |
+| is_partition_field_write_in_file | boolean| no       | false                                                   |
+| sink_columns                     | array  | no       | When this parameter is empty, all fields are sink columns |
+| is_enable_transaction            | boolean| no       | true                                                    |
+| save_mode                        | string | no       | "error"                                                 |
+
+### fs.defaultFS [string]
+
+The hadoop cluster address that start with `hdfs://`, for example: `hdfs://hadoopcluster`
 
 ### path [string]
 
-The target dir path is required. The `hdfs file` starts with `hdfs://`.
+The target dir path is required.
 
 ### file_name_expression [string]
 
@@ -125,7 +130,8 @@ For text file format
 ```bash
 
 HdfsFile {
-    path="hdfs://mycluster/tmp/hive/warehouse/test2"
+    fs.defaultFS="hdfs://hadoopcluster"
+    path="/tmp/hive/warehouse/test2"
     field_delimiter="\t"
     row_delimiter="\n"
     partition_by=["age"]
@@ -145,7 +151,8 @@ For parquet file format
 ```bash
 
 HdfsFile {
-    path="hdfs://mycluster/tmp/hive/warehouse/test2"
+    fs.defaultFS="hdfs://hadoopcluster"
+    path="/tmp/hive/warehouse/test2"
     partition_by=["age"]
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
@@ -163,7 +170,8 @@ For orc file format
 ```bash
 
 HdfsFile {
-    path="hdfs://mycluster/tmp/hive/warehouse/test2"
+    fs.defaultFS="hdfs://hadoopcluster"
+    path="/tmp/hive/warehouse/test2"
     partition_by=["age"]
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
index 01f23e1fe..a484a6345 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
@@ -17,16 +17,36 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.AbstractFileSink;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends AbstractFileSink {
+public class HdfsFileSink extends BaseFileSink {
+
+    @Override
+    public String getPluginName() {
+        return FileSystemType.HDFS.getFileSystemPluginName();
+    }
+
     @Override
-    public SinkFileSystemPlugin getSinkFileSystemPlugin() {
-        return new HdfsFileSinkPlugin();
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, FS_DEFAULT_NAME_KEY);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+        }
+        super.prepare(pluginConfig);
+        hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkPlugin.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkPlugin.java
deleted file mode 100644
index 33ffb1f91..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkPlugin.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem.HdfsFileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem.HdfsFileSystemCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer.HdfsTransactionStateFileWriteFactory;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-
-import java.util.List;
-import java.util.Optional;
-
-public class HdfsFileSinkPlugin implements SinkFileSystemPlugin {
-    @Override
-    public String getPluginName() {
-        return FileSystemType.HDFS.getFileSystemPluginName();
-    }
-
-    @Override
-    public Optional<TransactionStateFileWriter> getTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
-                                                                              @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
-                                                                              @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
-                                                                              @NonNull List<Integer> sinkColumnsIndexInRow,
-                                                                              @NonNull String tmpPath,
-                                                                              @NonNull String targetPath,
-                                                                              @NonNull String jobId,
-                                                                              int subTaskIndex,
-                                                                              @NonNull String fieldDelimiter,
-                                                                              @NonNull String rowDelimiter,
-                                                                              @NonNull FileSystem fileSystem) {
-        // using factory to generate transaction state file writer
-        TransactionStateFileWriter writer = HdfsTransactionStateFileWriteFactory.of(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fieldDelimiter, rowDelimiter, fileSystem);
-        return Optional.of(writer);
-    }
-
-    @Override
-    public Optional<FileSystemCommitter> getFileSystemCommitter() {
-        return Optional.of(new HdfsFileSystemCommitter());
-    }
-
-    @Override
-    public Optional<FileSystem> getFileSystem() {
-        return Optional.of(new HdfsFileSystem());
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystem.java
deleted file mode 100644
index a70e2a3d6..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystem.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem;
-
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class HdfsFileSystem implements FileSystem {
-    @Override
-    public void deleteFile(String path) throws IOException {
-        HdfsUtils.deleteFile(path);
-    }
-
-    @Override
-    public List<String> dirList(String dirPath) throws IOException {
-        List<Path> paths = HdfsUtils.dirList(dirPath);
-        return paths.stream().map(dir -> dir.getName()).collect(Collectors.toList());
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystemCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystemCommitter.java
deleted file mode 100644
index ae80856e6..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystemCommitter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem;
-
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
-
-import lombok.NonNull;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class HdfsFileSystemCommitter implements FileSystemCommitter {
-    @Override
-    public void commitTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException {
-        for (Map.Entry<String, Map<String, String>> entry : aggregateCommitInfo.getTransactionMap().entrySet()) {
-            for (Map.Entry<String, String> mvFileEntry : entry.getValue().entrySet()) {
-                HdfsUtils.renameFile(mvFileEntry.getKey(), mvFileEntry.getValue(), true);
-            }
-            // delete the transaction dir
-            HdfsUtils.deleteFile(entry.getKey());
-        }
-    }
-
-    @Override
-    public void abortTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException {
-        for (Map.Entry<String, Map<String, String>> entry : aggregateCommitInfo.getTransactionMap().entrySet()) {
-            // rollback the file
-            for (Map.Entry<String, String> mvFileEntry : entry.getValue().entrySet()) {
-                if (HdfsUtils.fileExist(mvFileEntry.getValue()) && !HdfsUtils.fileExist(mvFileEntry.getKey())) {
-                    HdfsUtils.renameFile(mvFileEntry.getValue(), mvFileEntry.getKey(), true);
-                }
-            }
-            // delete the transaction dir
-            HdfsUtils.deleteFile(entry.getKey());
-        }
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/util/HdfsUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/util/HdfsUtils.java
deleted file mode 100644
index 7b5b972ed..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/util/HdfsUtils.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util;
-
-import lombok.NonNull;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-public class HdfsUtils {
-    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsUtils.class);
-
-    public static final int WRITE_BUFFER_SIZE = 2048;
-
-    public static final Configuration CONF = new Configuration();
-
-    // make the configuration object static, so orc and parquet reader can get it
-    static {
-        LOGGER.info(System.getenv("HADOOP_CONF_DIR"));
-        CONF.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/core-site.xml"));
-        CONF.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/hdfs-site.xml"));
-        CONF.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-    }
-
-    public static FileSystem getHdfsFs(@NonNull String path)
-            throws IOException {
-        return FileSystem.get(URI.create(path), CONF);
-    }
-
-    public static FSDataOutputStream getOutputStream(@NonNull String outFilePath) throws IOException {
-        FileSystem hdfsFs = getHdfsFs(outFilePath);
-        Path path = new Path(outFilePath);
-        FSDataOutputStream fsDataOutputStream = hdfsFs.create(path, true, WRITE_BUFFER_SIZE);
-        return fsDataOutputStream;
-    }
-
-    public static void createFile(@NonNull String filePath) throws IOException {
-        FileSystem hdfsFs = getHdfsFs(filePath);
-        Path path = new Path(filePath);
-        if (!hdfsFs.createNewFile(path)) {
-            throw new IOException("create file " + filePath + " error");
-        }
-    }
-
-    public static void deleteFile(@NonNull String file) throws IOException {
-        FileSystem hdfsFs = getHdfsFs(file);
-        if (!hdfsFs.delete(new Path(file), true)) {
-            throw new IOException("delete file " + file + " error");
-        }
-    }
-
-    /**
-     * rename file
-     *
-     * @param oldName     old file name
-     * @param newName     target file name
-     * @param rmWhenExist if this is true, we will delete the target file when it already exists
-     * @throws IOException throw IOException
-     */
-    public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist) throws IOException {
-        FileSystem hdfsFs = getHdfsFs(newName);
-        LOGGER.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
-
-        Path oldPath = new Path(oldName);
-        Path newPath = new Path(newName);
-        if (rmWhenExist) {
-            if (fileExist(newName) && fileExist(oldName)) {
-                hdfsFs.delete(newPath, true);
-            }
-        }
-        if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
-            createDir(newName.substring(0, newName.lastIndexOf("/")));
-        }
-
-        if (hdfsFs.rename(oldPath, newPath)) {
-            LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
-        } else {
-            throw new IOException("rename file :[" + oldPath + "] to [" + newPath + "] error");
-        }
-    }
-
-    public static void createDir(@NonNull String filePath) throws IOException {
-        FileSystem hdfsFs = getHdfsFs(filePath);
-        Path dfs = new Path(filePath);
-        if (!hdfsFs.mkdirs(dfs)) {
-            throw new IOException("create dir " + filePath + " error");
-        }
-    }
-
-    public static boolean fileExist(@NonNull String filePath) throws IOException {
-        FileSystem hdfsFs = getHdfsFs(filePath);
-        Path fileName = new Path(filePath);
-        return hdfsFs.exists(fileName);
-    }
-
-    /**
-     * get the dir in filePath
-     */
-    public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
-        FileSystem hdfsFs = getHdfsFs(filePath);
-        List<Path> pathList = new ArrayList<Path>();
-        Path fileName = new Path(filePath);
-        FileStatus[] status = hdfsFs.listStatus(fileName);
-        if (status != null && status.length > 0) {
-            for (FileStatus fileStatus : status) {
-                if (fileStatus.isDirectory()) {
-                    pathList.add(fileStatus.getPath());
-                }
-            }
-        }
-        return pathList;
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsJsonTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsJsonTransactionStateFileWriter.java
deleted file mode 100644
index c11d57ce6..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsJsonTransactionStateFileWriter.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.serialization.SerializationSchema;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-import org.apache.seatunnel.format.json.JsonSerializationSchema;
-
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@Slf4j
-public class HdfsJsonTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
-
-    private static final long serialVersionUID = -5432828969702531646L;
-
-    private final byte[] rowDelimiter;
-    private final SerializationSchema serializationSchema;
-    private Map<String, FSDataOutputStream> beingWrittenOutputStream;
-
-    public HdfsJsonTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
-                                              @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
-                                              @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
-                                              @NonNull List<Integer> sinkColumnsIndexInRow,
-                                              @NonNull String tmpPath,
-                                              @NonNull String targetPath,
-                                              @NonNull String jobId,
-                                              int subTaskIndex,
-                                              @NonNull String rowDelimiter,
-                                              @NonNull FileSystem fileSystem) {
-        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
-
-        this.rowDelimiter = rowDelimiter.getBytes();
-        this.serializationSchema = new JsonSerializationSchema(seaTunnelRowTypeInfo);
-        beingWrittenOutputStream = new HashMap<>();
-    }
-
-    @Override
-    public void beginTransaction(String transactionId) {
-        this.beingWrittenOutputStream = new HashMap<>();
-    }
-
-    @Override
-    public void abortTransaction(String transactionId) {
-        this.beingWrittenOutputStream = new HashMap<>();
-    }
-
-    @Override
-    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
-        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
-        FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
-        try {
-            byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
-            fsDataOutputStream.write(rowBytes);
-            fsDataOutputStream.write(rowDelimiter);
-        } catch (IOException e) {
-            log.error("write data to file {} error", filePath);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void finishAndCloseWriteFile() {
-        beingWrittenOutputStream.entrySet().forEach(entry -> {
-            try {
-                entry.getValue().flush();
-            } catch (IOException e) {
-                log.error("error when flush file {}", entry.getKey());
-                throw new RuntimeException(e);
-            } finally {
-                try {
-                    entry.getValue().close();
-                } catch (IOException e) {
-                    log.error("error when close output stream {}", entry.getKey());
-                }
-            }
-
-            needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey()));
-        });
-    }
-
-    private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
-        FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
-        if (fsDataOutputStream == null) {
-            try {
-                fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
-                beingWrittenOutputStream.put(filePath, fsDataOutputStream);
-            } catch (IOException e) {
-                log.error("can not get output file stream");
-                throw new RuntimeException(e);
-            }
-        }
-        return fsDataOutputStream;
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsOrcTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsOrcTransactionStateFileWriter.java
deleted file mode 100644
index 23648359c..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsOrcTransactionStateFileWriter.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcFile;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.Writer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class HdfsOrcTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
-    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsOrcTransactionStateFileWriter.class);
-    private Map<String, Writer> beingWrittenWriter;
-
-    public HdfsOrcTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
-                                             @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
-                                             @NonNull List<Integer> sinkColumnsIndexInRow,
-                                             @NonNull String tmpPath,
-                                             @NonNull String targetPath,
-                                             @NonNull String jobId,
-                                             int subTaskIndex,
-                                             @NonNull FileSystem fileSystem) {
-        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
-        this.beingWrittenWriter = new HashMap<>();
-    }
-
-    @Override
-    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
-        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
-        Writer writer = getOrCreateWriter(filePath);
-        TypeDescription schema = buildSchemaWithRowType();
-        VectorizedRowBatch rowBatch = schema.createRowBatch();
-        int i = 0;
-        int row = rowBatch.size++;
-        for (Integer index : sinkColumnsIndexInRow) {
-            Object value = seaTunnelRow.getField(index);
-            ColumnVector vector = rowBatch.cols[i];
-            setColumn(value, vector, row);
-            i++;
-        }
-        try {
-            writer.addRowBatch(rowBatch);
-            rowBatch.reset();
-        } catch (IOException e) {
-            String errorMsg = String.format("Write data to orc file [%s] error", filePath);
-            throw new RuntimeException(errorMsg, e);
-        }
-    }
-
-    @Override
-    public void finishAndCloseWriteFile() {
-        this.beingWrittenWriter.forEach((k, v) -> {
-            try {
-                v.close();
-            } catch (IOException e) {
-                String errorMsg = String.format("Close file [%s] orc writer failed, error msg: [%s]", k, e.getMessage());
-                throw new RuntimeException(errorMsg, e);
-            } catch (NullPointerException e) {
-                // Because orc writer not support be closed multi times, so if the second time close orc writer it will throw NullPointerException
-                // In a whole process of file sink, it will experience four stages:
-                // 1. beginTransaction 2. prepareCommit 3. commit 4. close
-                // In the first stage, it will not close any writers, start with the second stage, writer will be closed.
-                // In the last stage, it will not close any writers
-                // So orc writer will be closed one extra time after is closed.
-                LOGGER.info("Close file [{}] orc writer", k);
-            }
-            needMoveFiles.put(k, getTargetLocation(k));
-        });
-    }
-
-    @Override
-    public void beginTransaction(String transactionId) {
-        this.beingWrittenWriter = new HashMap<>();
-    }
-
-    @Override
-    public void abortTransaction(String transactionId) {
-        this.beingWrittenWriter = new HashMap<>();
-    }
-
-    private Writer getOrCreateWriter(@NonNull String filePath) {
-        Writer writer = this.beingWrittenWriter.get(filePath);
-        if (writer == null) {
-            TypeDescription schema = buildSchemaWithRowType();
-            Path path = new Path(filePath);
-            try {
-                OrcFile.WriterOptions options = OrcFile.writerOptions(HdfsUtils.CONF)
-                        .setSchema(schema)
-                        // temporarily used snappy
-                        .compress(CompressionKind.SNAPPY)
-                        // use orc version 0.12
-                        .version(OrcFile.Version.V_0_12)
-                        .overwrite(true);
-                Writer newWriter = OrcFile.createWriter(path, options);
-                this.beingWrittenWriter.put(filePath, newWriter);
-                return newWriter;
-            } catch (IOException e) {
-                String errorMsg = String.format("Get orc writer for file [%s] error", filePath);
-                throw new RuntimeException(errorMsg, e);
-            }
-        }
-        return writer;
-    }
-
-    private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
-        if (BasicType.BOOLEAN_TYPE.equals(type)) {
-            return TypeDescription.createBoolean();
-        }
-        if (BasicType.SHORT_TYPE.equals(type)) {
-            return TypeDescription.createShort();
-        }
-        if (BasicType.INT_TYPE.equals(type)) {
-            return TypeDescription.createInt();
-        }
-        if (BasicType.LONG_TYPE.equals(type)) {
-            return TypeDescription.createLong();
-        }
-        if (BasicType.FLOAT_TYPE.equals(type)) {
-            return TypeDescription.createFloat();
-        }
-        if (BasicType.DOUBLE_TYPE.equals(type)) {
-            return TypeDescription.createDouble();
-        }
-        if (BasicType.BYTE_TYPE.equals(type)) {
-            return TypeDescription.createByte();
-        }
-        return TypeDescription.createString();
-    }
-
-    private TypeDescription buildSchemaWithRowType() {
-        TypeDescription schema = TypeDescription.createStruct();
-        for (Integer i : sinkColumnsIndexInRow) {
-            TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowTypeInfo.getFieldType(i));
-            schema.addField(seaTunnelRowTypeInfo.getFieldName(i), fieldType);
-        }
-        return schema;
-    }
-
-    private void setColumn(Object value, ColumnVector vector, int row) {
-        if (value == null) {
-            vector.isNull[row] = true;
-            vector.noNulls = false;
-        } else {
-            switch (vector.type) {
-                case LONG:
-                    LongColumnVector longVector = (LongColumnVector) vector;
-                    setLongColumnVector(value, longVector, row);
-                    break;
-                case DOUBLE:
-                    DoubleColumnVector doubleColumnVector = (DoubleColumnVector) vector;
-                    setDoubleVector(value, doubleColumnVector, row);
-                    break;
-                case BYTES:
-                    BytesColumnVector bytesColumnVector = (BytesColumnVector) vector;
-                    setByteColumnVector(value, bytesColumnVector, row);
-                    break;
-                default:
-                    throw new RuntimeException("Unexpected ColumnVector subtype");
-            }
-        }
-    }
-
-    private void setLongColumnVector(Object value, LongColumnVector longVector, int row) {
-        if (value instanceof Boolean) {
-            Boolean bool = (Boolean) value;
-            longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
-        }  else if (value instanceof Integer) {
-            longVector.vector[row] = (Integer) value;
-        } else if (value instanceof Long) {
-            longVector.vector[row] = (Long) value;
-        } else if (value instanceof BigInteger) {
-            BigInteger bigInt = (BigInteger) value;
-            longVector.vector[row] = bigInt.longValue();
-        } else {
-            throw new RuntimeException("Long or Integer type expected for field");
-        }
-    }
-
-    private void setByteColumnVector(Object value, BytesColumnVector bytesColVector, int rowNum) {
-        if (value instanceof byte[] || value instanceof String) {
-            byte[] byteVec;
-            if (value instanceof String) {
-                String strVal = (String) value;
-                byteVec = strVal.getBytes(StandardCharsets.UTF_8);
-            } else {
-                byteVec = (byte[]) value;
-            }
-            bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length);
-        } else {
-            throw new RuntimeException("byte[] or String type expected for field ");
-        }
-    }
-
-    private void setDoubleVector(Object value, DoubleColumnVector doubleVector, int rowNum) {
-        if (value instanceof Double) {
-            doubleVector.vector[rowNum] = (Double) value;
-        } else if (value instanceof Float) {
-            Float floatValue = (Float) value;
-            doubleVector.vector[rowNum] = floatValue.doubleValue();
-        } else {
-            throw new RuntimeException("Double or Float type expected for field ");
-        }
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsParquetTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsParquetTransactionStateFileWriter.java
deleted file mode 100644
index 8b904fda9..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsParquetTransactionStateFileWriter.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.hadoop.util.HadoopOutputFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class HdfsParquetTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
-    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsParquetTransactionStateFileWriter.class);
-    private Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
-
-    public HdfsParquetTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
-                                                 @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
-                                                 @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
-                                                 @NonNull List<Integer> sinkColumnsIndexInRow, @NonNull String tmpPath,
-                                                 @NonNull String targetPath,
-                                                 @NonNull String jobId,
-                                                 int subTaskIndex,
-                                                 @NonNull FileSystem fileSystem) {
-        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
-        beingWrittenWriter = new HashMap<>();
-    }
-
-    @Override
-    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
-        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
-        ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
-        Schema schema = buildSchemaWithRowType();
-        GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
-        sinkColumnsIndexInRow.forEach(index -> recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index)));
-        GenericData.Record record = recordBuilder.build();
-        try {
-            writer.write(record);
-        } catch (IOException e) {
-            String errorMsg = String.format("Write data to file [%s] error", filePath);
-            throw new RuntimeException(errorMsg, e);
-        }
-    }
-
-    @Override
-    public void finishAndCloseWriteFile() {
-        this.beingWrittenWriter.forEach((k, v) -> {
-            try {
-                v.close();
-            } catch (IOException e) {
-                String errorMsg = String.format("Close file [%s] parquet writer failed, error msg: [%s]", k, e.getMessage());
-                throw new RuntimeException(errorMsg, e);
-            }
-            needMoveFiles.put(k, getTargetLocation(k));
-        });
-    }
-
-    @Override
-    public void beginTransaction(String transactionId) {
-        this.beingWrittenWriter = new HashMap<>();
-    }
-
-    @Override
-    public void abortTransaction(String transactionId) {
-        this.beingWrittenWriter = new HashMap<>();
-    }
-
-    private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
-        ParquetWriter<GenericRecord> writer = this.beingWrittenWriter.get(filePath);
-        if (writer == null) {
-            Schema schema = buildSchemaWithRowType();
-            Path path = new Path(filePath);
-            try {
-                HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, HdfsUtils.CONF);
-                ParquetWriter<GenericRecord> newWriter = AvroParquetWriter.<GenericRecord>builder(outputFile)
-                        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
-                        // use parquet v1 to improve compatibility
-                        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
-                        // Temporarily use snappy compress
-                        // I think we can use the compress option in config to control this
-                        .withCompressionCodec(CompressionCodecName.SNAPPY)
-                        .withSchema(schema)
-                        .build();
-                this.beingWrittenWriter.put(filePath, newWriter);
-                return newWriter;
-            } catch (IOException e) {
-                String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
-                throw new RuntimeException(errorMsg, e);
-            }
-        }
-        return writer;
-    }
-
-    private Schema buildSchemaWithRowType() {
-        ArrayList<Schema.Field> fields = new ArrayList<>();
-        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowTypeInfo.getFieldTypes();
-        String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
-        sinkColumnsIndexInRow.forEach(index -> {
-            if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[index])) {
-                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BOOLEAN), null, null);
-                fields.add(field);
-            } else if (BasicType.SHORT_TYPE.equals(fieldTypes[index]) || BasicType.INT_TYPE.equals(fieldTypes[index])) {
-                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.INT), null, null);
-                fields.add(field);
-            } else if (BasicType.LONG_TYPE.equals(fieldTypes[index])) {
-                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.LONG), null, null);
-                fields.add(field);
-            } else if (BasicType.FLOAT_TYPE.equals(fieldTypes[index])) {
-                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.FLOAT), null, null);
-                fields.add(field);
-            } else if (BasicType.DOUBLE_TYPE.equals(fieldTypes[index])) {
-                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.DOUBLE), null, null);
-                fields.add(field);
-            } else if (BasicType.STRING_TYPE.equals(fieldTypes[index])) {
-                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.STRING), null, null);
-                fields.add(field);
-            } else if (BasicType.BYTE_TYPE.equals(fieldTypes[index])) {
-                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BYTES), null, null);
-                fields.add(field);
-            } else if (BasicType.VOID_TYPE.equals(fieldTypes[index])) {
-                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.NULL), null, null);
-                fields.add(field);
-            }
-        });
-        return Schema.createRecord("SeatunnelRecord",
-                "The record generated by seatunnel file connector",
-                "org.apache.parquet.avro",
-                false,
-                fields);
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTransactionStateFileWriteFactory.java
deleted file mode 100644
index 814491b4f..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTransactionStateFileWriteFactory.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-
-import java.util.List;
-
-public class HdfsTransactionStateFileWriteFactory {
-
-    private HdfsTransactionStateFileWriteFactory() {}
-
-    public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
-                                                @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
-                                                @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
-                                                @NonNull List<Integer> sinkColumnsIndexInRow,
-                                                @NonNull String tmpPath,
-                                                @NonNull String targetPath,
-                                                @NonNull String jobId,
-                                                int subTaskIndex,
-                                                @NonNull String fieldDelimiter,
-                                                @NonNull String rowDelimiter,
-                                                @NonNull FileSystem fileSystem) {
-        FileSinkTransactionFileNameGenerator fileSinkTransactionFileNameGenerator = (FileSinkTransactionFileNameGenerator) transactionFileNameGenerator;
-        FileFormat fileFormat = fileSinkTransactionFileNameGenerator.getFileFormat();
-        if (fileFormat.equals(FileFormat.CSV)) {
-            // #2133 wait this issue closed, there will be replaced using csv writer
-            return new HdfsTxtTransactionStateFileWriter(
-                    seaTunnelRowTypeInfo,
-                    transactionFileNameGenerator,
-                    partitionDirNameGenerator,
-                    sinkColumnsIndexInRow,
-                    tmpPath,
-                    targetPath,
-                    jobId,
-                    subTaskIndex,
-                    fieldDelimiter,
-                    rowDelimiter,
-                    fileSystem);
-        }
-        if (fileFormat.equals(FileFormat.PARQUET)) {
-            return new HdfsParquetTransactionStateFileWriter(
-                    seaTunnelRowTypeInfo,
-                    transactionFileNameGenerator,
-                    partitionDirNameGenerator,
-                    sinkColumnsIndexInRow,
-                    tmpPath,
-                    targetPath,
-                    jobId,
-                    subTaskIndex,
-                    fileSystem);
-        }
-        if (fileFormat.equals(FileFormat.ORC)) {
-            return new HdfsOrcTransactionStateFileWriter(
-                    seaTunnelRowTypeInfo,
-                    transactionFileNameGenerator,
-                    partitionDirNameGenerator,
-                    sinkColumnsIndexInRow,
-                    tmpPath,
-                    targetPath,
-                    jobId,
-                    subTaskIndex,
-                    fileSystem);
-        }
-        if (fileFormat.equals(FileFormat.JSON)) {
-            return new HdfsJsonTransactionStateFileWriter(
-                    seaTunnelRowTypeInfo,
-                    transactionFileNameGenerator,
-                    partitionDirNameGenerator,
-                    sinkColumnsIndexInRow,
-                    tmpPath,
-                    targetPath,
-                    jobId,
-                    subTaskIndex,
-                    rowDelimiter,
-                    fileSystem);
-        }
-        // if file type not supported by file connector, default txt writer will be generated
-        return new HdfsTxtTransactionStateFileWriter(
-                    seaTunnelRowTypeInfo,
-                    transactionFileNameGenerator,
-                    partitionDirNameGenerator,
-                    sinkColumnsIndexInRow,
-                    tmpPath,
-                    targetPath,
-                    jobId,
-                    subTaskIndex,
-                    fieldDelimiter,
-                    rowDelimiter,
-                    fileSystem);
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTxtTransactionStateFileWriter.java
deleted file mode 100644
index 2ee792a10..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTxtTransactionStateFileWriter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class HdfsTxtTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
-    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsTxtTransactionStateFileWriter.class);
-    private Map<String, FSDataOutputStream> beingWrittenOutputStream;
-
-    private String fieldDelimiter;
-    private String rowDelimiter;
-
-    public HdfsTxtTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
-                                             @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
-                                             @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
-                                             @NonNull List<Integer> sinkColumnsIndexInRow,
-                                             @NonNull String tmpPath,
-                                             @NonNull String targetPath,
-                                             @NonNull String jobId,
-                                             int subTaskIndex,
-                                             @NonNull String fieldDelimiter,
-                                             @NonNull String rowDelimiter,
-                                             @NonNull FileSystem fileSystem) {
-        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
-
-        this.fieldDelimiter = fieldDelimiter;
-        this.rowDelimiter = rowDelimiter;
-        beingWrittenOutputStream = new HashMap<>();
-    }
-
-    @Override
-    public void beginTransaction(String transactionId) {
-        this.beingWrittenOutputStream = new HashMap<>();
-    }
-
-    @Override
-    public void abortTransaction(String transactionId) {
-        this.beingWrittenOutputStream = new HashMap<>();
-    }
-
-    @Override
-    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
-        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
-        FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
-        String line = transformRowToLine(seaTunnelRow);
-        try {
-            fsDataOutputStream.write(line.getBytes());
-            fsDataOutputStream.write(rowDelimiter.getBytes());
-        } catch (IOException e) {
-            LOGGER.error("write data to file {} error", filePath);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void finishAndCloseWriteFile() {
-        beingWrittenOutputStream.entrySet().forEach(entry -> {
-            try {
-                entry.getValue().flush();
-            } catch (IOException e) {
-                LOGGER.error("error when flush file {}", entry.getKey());
-                throw new RuntimeException(e);
-            } finally {
-                try {
-                    entry.getValue().close();
-                } catch (IOException e) {
-                    LOGGER.error("error when close output stream {}", entry.getKey());
-                }
-            }
-
-            needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey()));
-        });
-    }
-
-    private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
-        FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
-        if (fsDataOutputStream == null) {
-            try {
-                fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
-                beingWrittenOutputStream.put(filePath, fsDataOutputStream);
-            } catch (IOException e) {
-                LOGGER.error("can not get output file stream");
-                throw new RuntimeException(e);
-            }
-        }
-        return fsDataOutputStream;
-    }
-
-    private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
-        return this.sinkColumnsIndexInRow.stream().map(index -> seaTunnelRow.getFields()[index] == null ? "" : seaTunnelRow.getFields()[index].toString()).collect(Collectors.joining(fieldDelimiter));
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/FileSinkAggregatedCommitterTest.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/FileSinkAggregatedCommitterTest.java
deleted file mode 100644
index e76ee1d43..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/FileSinkAggregatedCommitterTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem.HdfsFileSystemCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkAggregatedCommitter;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledOnOs;
-import org.junit.jupiter.api.condition.OS;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.stream.Collectors;
-
-@EnabledOnOs(value = {OS.MAC, OS.LINUX})
-public class FileSinkAggregatedCommitterTest {
-    @SuppressWarnings("checkstyle:UnnecessaryParentheses")
-    @Test
-    public void testCommit() throws Exception {
-        FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
-        Map<String, Map<String, String>> transactionFiles = new HashMap<>();
-        Random random = new Random();
-        Long jobId = random.nextLong();
-        String transactionDir = String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
-        String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
-        Map<String, String> needMoveFiles = new HashMap<>();
-        needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir + "/c3=4/c4=rrr/test1.txt");
-        needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir + "/c3=4/c4=bbb/test1.txt");
-        HdfsUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
-        HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
-
-        transactionFiles.put(transactionDir, needMoveFiles);
-
-        Map<String, List<String>> partitionDirAndVals = new HashMap<>();
-        partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new String[]{"4", "rrr"})).collect(Collectors.toList()));
-        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList()));
-
-        FileAggregatedCommitInfo fileAggregatedCommitInfo = new FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
-        List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new ArrayList<>();
-        fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
-        fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
-
-        Assertions.assertTrue(HdfsUtils.fileExist(targetDir + "/c3=4/c4=bbb/test1.txt"));
-        Assertions.assertTrue(HdfsUtils.fileExist(targetDir + "/c3=4/c4=rrr/test1.txt"));
-        Assertions.assertTrue(!HdfsUtils.fileExist(transactionDir));
-    }
-
-    @SuppressWarnings("checkstyle:UnnecessaryParentheses")
-    @Test
-    public void testCombine() throws Exception {
-        FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
-        Map<String, Map<String, String>> transactionFiles = new HashMap<>();
-        Random random = new Random();
-        Long jobId = random.nextLong();
-        String transactionDir = String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
-        String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
-        Map<String, String> needMoveFiles = new HashMap<>();
-        needMoveFiles.put(transactionDir + "/c3=3/c4=rrr/test1.txt", targetDir + "/c3=3/c4=rrr/test1.txt");
-        needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir + "/c3=4/c4=bbb/test1.txt");
-        Map<String, List<String>> partitionDirAndVals = new HashMap<>();
-        partitionDirAndVals.put("/c3=3/c4=rrr", Arrays.stream((new String[]{"3", "rrr"})).collect(Collectors.toList()));
-        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList()));
-        FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles, partitionDirAndVals, transactionDir);
-        HdfsUtils.createFile(transactionDir + "/c3=3/c4=rrr/test1.txt");
-        HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
-
-        Map<String, String> needMoveFiles1 = new HashMap<>();
-        needMoveFiles1.put(transactionDir + "/c3=4/c4=rrr/test2.txt", targetDir + "/c3=4/c4=rrr/test2.txt");
-        needMoveFiles1.put(transactionDir + "/c3=4/c4=bbb/test2.txt", targetDir + "/c3=4/c4=bbb/test2.txt");
-        Map<String, List<String>> partitionDirAndVals1 = new HashMap<>();
-        partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new String[]{"4", "rrr"})).collect(Collectors.toList()));
-        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList()));
-        FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1, partitionDirAndVals1, transactionDir);
-        List<FileCommitInfo> fileCommitInfoList = new ArrayList<>();
-        fileCommitInfoList.add(fileCommitInfo);
-        fileCommitInfoList.add(fileCommitInfo1);
-
-        FileAggregatedCommitInfo combine = fileSinkAggregatedCommitter.combine(fileCommitInfoList);
-        Assertions.assertEquals(1, combine.getTransactionMap().size());
-        Assertions.assertEquals(4, combine.getTransactionMap().get(transactionDir).size());
-        Assertions.assertEquals(targetDir + "/c3=3/c4=rrr/test1.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=3/c4=rrr/test1.txt"));
-        Assertions.assertEquals(targetDir + "/c3=4/c4=bbb/test1.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=4/c4=bbb/test1.txt"));
-        Assertions.assertEquals(targetDir + "/c3=4/c4=rrr/test2.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=4/c4=rrr/test2.txt"));
-        Assertions.assertEquals(targetDir + "/c3=4/c4=bbb/test2.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=4/c4=bbb/test2.txt"));
-        Assertions.assertEquals(3, combine.getPartitionDirAndValsMap().keySet().size());
-    }
-
-    @SuppressWarnings("checkstyle:UnnecessaryParentheses")
-    @Test
-    public void testAbort() throws Exception {
-        FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
-        Map<String, Map<String, String>> transactionFiles = new HashMap<>();
-        Random random = new Random();
-        Long jobId = random.nextLong();
-        String transactionDir = String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
-        String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
-        Map<String, String> needMoveFiles = new HashMap<>();
-        needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir + "/c3=4/c4=rrr/test1.txt");
-        needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir + "/c3=4/c4=bbb/test1.txt");
-        Map<String, List<String>> partitionDirAndVals = new HashMap<>();
-        partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new String[]{"4", "rrr"})).collect(Collectors.toList()));
-        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList()));
-        HdfsUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
-        HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
-
-        transactionFiles.put(transactionDir, needMoveFiles);
-        FileAggregatedCommitInfo fileAggregatedCommitInfo = new FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
-        List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new ArrayList<>();
-        fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
-        fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
-
-        Assertions.assertTrue(HdfsUtils.fileExist(targetDir + "/c3=4/c4=bbb/test1.txt"));
-        Assertions.assertTrue(HdfsUtils.fileExist(targetDir + "/c3=4/c4=rrr/test1.txt"));
-        Assertions.assertTrue(!HdfsUtils.fileExist(transactionDir));
-
-        fileSinkAggregatedCommitter.abort(fileAggregatedCommitInfoList);
-        Assertions.assertTrue(!HdfsUtils.fileExist(targetDir + "/c3=4/c4=bbb/test1.txt"));
-        Assertions.assertTrue(!HdfsUtils.fileExist(targetDir + "/c3=4/c4=rrr/test1.txt"));
-
-        // transactionDir will being delete when abort
-        Assertions.assertTrue(!HdfsUtils.fileExist(transactionDir));
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/TestHdfsTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/TestHdfsTxtTransactionStateFileWriter.java
deleted file mode 100644
index 88ca2b9f4..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/TestHdfsTxtTransactionStateFileWriter.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem.HdfsFileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer.HdfsTxtTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledOnOs;
-import org.junit.jupiter.api.condition.OS;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-@EnabledOnOs(value = {OS.MAC, OS.LINUX})
-public class TestHdfsTxtTransactionStateFileWriter {
-
-    @SuppressWarnings("checkstyle:MagicNumber")
-    @Test
-    public void testHdfsTextTransactionStateFileWriter() throws Exception {
-        String[] fieldNames = new String[]{"c1", "c2", "c3", "c4"};
-        SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[]{BasicType.BOOLEAN_TYPE, BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE};
-        SeaTunnelRowType seaTunnelRowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
-
-        List<Integer> sinkColumnIndexInRow = new ArrayList<>();
-        sinkColumnIndexInRow.add(0);
-        sinkColumnIndexInRow.add(1);
-
-        List<String> hivePartitionFieldList = new ArrayList<>();
-        hivePartitionFieldList.add("c3");
-        hivePartitionFieldList.add("c4");
-
-        List<Integer> partitionFieldIndexInRow = new ArrayList<>();
-        partitionFieldIndexInRow.add(2);
-        partitionFieldIndexInRow.add(3);
-
-        String jobId = System.currentTimeMillis() + "";
-        String targetPath = "/tmp/hive/warehouse/seatunnel.db/test1";
-        String tmpPath = "/tmp/seatunnel";
-
-        TransactionStateFileWriter fileWriter = new HdfsTxtTransactionStateFileWriter(seaTunnelRowTypeInfo,
-            new FileSinkTransactionFileNameGenerator(FileFormat.TEXT, null, "yyyy.MM.dd"),
-            new FileSinkPartitionDirNameGenerator(hivePartitionFieldList, partitionFieldIndexInRow, "${k0}=${v0}/${k1}=${v1}"),
-            sinkColumnIndexInRow,
-            tmpPath,
-            targetPath,
-            jobId,
-            0,
-            String.valueOf('\001'),
-            "\n",
-            new HdfsFileSystem());
-
-        String transactionId = fileWriter.beginTransaction(1L);
-
-        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{true, 1, "str1", "str2"});
-        fileWriter.write(seaTunnelRow);
-
-        SeaTunnelRow seaTunnelRow1 = new SeaTunnelRow(new Object[]{true, 1, "str1", "str3"});
-        fileWriter.write(seaTunnelRow1);
-
-        Optional<FileCommitInfo> fileCommitInfoOptional = fileWriter.prepareCommit();
-        //check file exists and file content
-        Assertions.assertTrue(fileCommitInfoOptional.isPresent());
-        FileCommitInfo fileCommitInfo = fileCommitInfoOptional.get();
-        String transactionDir = tmpPath + "/seatunnel/" + jobId + "/" + transactionId;
-        Assertions.assertEquals(transactionDir, fileCommitInfo.getTransactionDir());
-        Assertions.assertEquals(2, fileCommitInfo.getNeedMoveFiles().size());
-        Map<String, String> needMoveFiles = fileCommitInfo.getNeedMoveFiles();
-        Assertions.assertEquals(targetPath + "/c3=str1/c4=str2/" + transactionId + ".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str2/" + transactionId + ".txt"));
-        Assertions.assertEquals(targetPath + "/c3=str1/c4=str3/" + transactionId + ".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str3/" + transactionId + ".txt"));
-
-        Map<String, List<String>> partitionDirAndValsMap = fileCommitInfo.getPartitionDirAndValsMap();
-        Assertions.assertEquals(2, partitionDirAndValsMap.size());
-        Assertions.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str2"));
-        Assertions.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str3"));
-        Assertions.assertTrue(partitionDirAndValsMap.get("c3=str1/c4=str2").size() == 2);
-        Assertions.assertEquals("str1", partitionDirAndValsMap.get("c3=str1/c4=str2").get(0));
-        Assertions.assertEquals("str2", partitionDirAndValsMap.get("c3=str1/c4=str2").get(1));
-        Assertions.assertEquals("str1", partitionDirAndValsMap.get("c3=str1/c4=str3").get(0));
-        Assertions.assertEquals("str3", partitionDirAndValsMap.get("c3=str1/c4=str3").get(1));
-    }
-}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
index 50d7c34d2..d6bfdefc5 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
@@ -53,31 +53,4 @@ public class FakeSourceToFileIT extends FlinkContainer {
         Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_local_json.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
-
-    /**
-     * fake source -> hdfs text file sink
-     */
-    @Test
-    public void testFakeSourceToHdfsFileText() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_text.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-    }
-
-    /**
-     * fake source -> hdfs parquet file sink
-     */
-    @Test
-    public void testFakeSourceToHdfsFileParquet() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_parquet.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-    }
-
-    /**
-     * fake source -> hdfs json file sink
-     */
-    @Test
-    public void testFakeSourceToHdfsFileJson() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_json.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-    }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
deleted file mode 100644
index 269b85d08..000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
-  # You can set flink configuration here
-  execution.parallelism = 1
-  job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
-  FakeSource {
-    result_table_name = "fake"
-    schema = {
-      fields {
-        name = "string"
-        age = "int"
-      }
-    }
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
-  sql {
-    sql = "select name,age from fake"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
-}
-
-sink {
-  HdfsFile {
-    path="/tmp/hive/warehouse/test2"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="json"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
deleted file mode 100644
index 5e1ea5c01..000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
-  # You can set flink configuration here
-  execution.parallelism = 1
-  job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
-  FakeSource {
-    result_table_name = "fake"
-    schema = {
-      fields {
-        name = "string"
-        age = "int"
-      }
-    }
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
-  sql {
-    sql = "select name,age from fake"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
-}
-
-sink {
-  HdfsFile {
-    path="/tmp/hive/warehouse/test2"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="parquet"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
deleted file mode 100644
index d4a8a745c..000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
-  # You can set flink configuration here
-  execution.parallelism = 1
-  job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
-  FakeSource {
-    result_table_name = "fake"
-    schema = {
-      fields {
-        name = "string"
-        age = "int"
-      }
-    }
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
-  sql {
-    sql = "select name,age from fake"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
-}
-
-sink {
-  HdfsFile {
-    path="/tmp/hive/warehouse/test2"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
index fccd8db06..b2535ebf2 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
@@ -57,32 +57,4 @@ public class FakeSourceToFileIT extends SparkContainer {
         Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_json.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
-
-
-    /**
-     * fake source -> hdfs text file sink
-     */
-    @Test
-    public void testFakeSourceToHdfsFileText() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_text.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-    }
-
-    /**
-     * fake source -> hdfs parquet file sink
-     */
-    @Test
-    public void testFakeSourceToHdfsFileParquet() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_parquet.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-    }
-
-    /**
-     * fake source -> hdfs json file sink
-     */
-    @Test
-    public void testFakeSourceToHdfsFileJson() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_json.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-    }
 }
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
deleted file mode 100644
index 40454bce0..000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
+++ /dev/null
@@ -1,69 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  # You can set spark configuration here
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-  job.mode = "BATCH"
-}
-
-source {
-  FakeSource {
-    result_table_name = "fake"
-    schema = {
-      fields {
-        name = "string"
-        age = "int"
-      }
-    }
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
-  sql {
-    sql = "select name,age from fake"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/category/transform
-}
-
-sink {
-  HdfsFile {
-    path="/tmp/hive/warehouse/test2"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="json"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
deleted file mode 100644
index 550990eea..000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  # You can set spark configuration here
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-  job.mode = "BATCH"
-}
-
-source {
-  FakeSource {
-    result_table_name = "fake"
-    schema = {
-      fields {
-        name = "string"
-        age = "int"
-      }
-    }
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
-  sql {
-    sql = "select name,age from fake"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/category/transform
-}
-
-sink {
-  HdfsFile {
-    path="/tmp/hive/warehouse/test2"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="parquet"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
deleted file mode 100644
index 2bf6afba6..000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  # You can set spark configuration here
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-  job.mode = "BATCH"
-}
-
-source {
-  FakeSource {
-    result_table_name = "fake"
-    schema = {
-      fields {
-        name = "string"
-        age = "int"
-      }
-    }
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
-  sql {
-    sql = "select name,age from fake"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/category/transform
-}
-
-sink {
-  HdfsFile {
-    path="/tmp/hive/warehouse/test2"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file