You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/09/15 11:47:07 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connector-V2] Refactor hdfs file sink connector code structure (#2701)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6129c0256 [Improve][Connector-V2] Refactor hdfs file sink connector code structure (#2701)
6129c0256 is described below
commit 6129c025678ec25ee155d89084f52d324b9f364a
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Thu Sep 15 19:47:01 2022 +0800
[Improve][Connector-V2] Refactor hdfs file sink connector code structure (#2701)
* [Improve][Connector-V2] Refactor hdfs file sink codes
---
docs/en/connector-v2/sink/HdfsFile.md | 44 ++--
.../seatunnel/file/hdfs/sink/HdfsFileSink.java | 30 ++-
.../file/hdfs/sink/HdfsFileSinkPlugin.java | 69 ------
.../file/hdfs/sink/filesystem/HdfsFileSystem.java | 40 ----
.../sink/filesystem/HdfsFileSystemCommitter.java | 54 -----
.../seatunnel/file/hdfs/sink/util/HdfsUtils.java | 138 ------------
.../writer/HdfsJsonTransactionStateFileWriter.java | 122 -----------
.../writer/HdfsOrcTransactionStateFileWriter.java | 244 ---------------------
.../HdfsParquetTransactionStateFileWriter.java | 169 --------------
.../HdfsTransactionStateFileWriteFactory.java | 115 ----------
.../writer/HdfsTxtTransactionStateFileWriter.java | 125 -----------
.../hdfs/sink/FileSinkAggregatedCommitterTest.java | 147 -------------
.../TestHdfsTxtTransactionStateFileWriter.java | 109 ---------
.../e2e/flink/v2/file/FakeSourceToFileIT.java | 27 ---
.../resources/file/fakesource_to_hdfs_json.conf | 70 ------
.../resources/file/fakesource_to_hdfs_parquet.conf | 71 ------
.../resources/file/fakesource_to_hdfs_text.conf | 71 ------
.../e2e/spark/v2/file/FakeSourceToFileIT.java | 28 ---
.../resources/file/fakesource_to_hdfs_json.conf | 69 ------
.../resources/file/fakesource_to_hdfs_parquet.conf | 70 ------
.../resources/file/fakesource_to_hdfs_text.conf | 70 ------
21 files changed, 51 insertions(+), 1831 deletions(-)
diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index e2e3b7561..928156760 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -24,24 +24,29 @@ By default, we use 2PC commit to ensure `exactly-once`
In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
-| name | type | required | default value |
-| --------------------------------- | ------ | -------- |---------------------------------------------------------|
-| path | string | yes | - |
-| file_name_expression | string | no | "${transactionId}" |
-| file_format | string | no | "text" |
-| filename_time_format | string | no | "yyyy.MM.dd" |
-| field_delimiter | string | no | '\001' |
-| row_delimiter | string | no | "\n" |
-| partition_by | array | no | - |
-| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
-| is_partition_field_write_in_file | boolean| no | false |
-| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
-| is_enable_transaction | boolean| no | true |
-| save_mode | string | no | "error" |
+| name | type | required | default value |
+|----------------------------------| ------ | -------- |---------------------------------------------------------|
+| fs.defaultFS | string | yes | - |
+| path | string | yes | - |
+| file_name_expression | string | no | "${transactionId}" |
+| file_format | string | no | "text" |
+| filename_time_format | string | no | "yyyy.MM.dd" |
+| field_delimiter | string | no | '\001' |
+| row_delimiter | string | no | "\n" |
+| partition_by | array | no | - |
+| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
+| is_partition_field_write_in_file | boolean| no | false |
+| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
+| is_enable_transaction | boolean| no | true |
+| save_mode | string | no | "error" |
+
+### fs.defaultFS [string]
+
+The hadoop cluster address that start with `hdfs://`, for example: `hdfs://hadoopcluster`
### path [string]
-The target dir path is required. The `hdfs file` starts with `hdfs://`.
+The target dir path is required.
### file_name_expression [string]
@@ -125,7 +130,8 @@ For text file format
```bash
HdfsFile {
- path="hdfs://mycluster/tmp/hive/warehouse/test2"
+ fs.defaultFS="hdfs://hadoopcluster"
+ path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
partition_by=["age"]
@@ -145,7 +151,8 @@ For parquet file format
```bash
HdfsFile {
- path="hdfs://mycluster/tmp/hive/warehouse/test2"
+ fs.defaultFS="hdfs://hadoopcluster"
+ path="/tmp/hive/warehouse/test2"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
@@ -163,7 +170,8 @@ For orc file format
```bash
HdfsFile {
- path="hdfs://mycluster/tmp/hive/warehouse/test2"
+ fs.defaultFS="hdfs://hadoopcluster"
+ path="/tmp/hive/warehouse/test2"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
index 01f23e1fe..a484a6345 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
@@ -17,16 +17,36 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.AbstractFileSink;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends AbstractFileSink {
+public class HdfsFileSink extends BaseFileSink {
+
+ @Override
+ public String getPluginName() {
+ return FileSystemType.HDFS.getFileSystemPluginName();
+ }
+
@Override
- public SinkFileSystemPlugin getSinkFileSystemPlugin() {
- return new HdfsFileSinkPlugin();
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, FS_DEFAULT_NAME_KEY);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+ }
+ super.prepare(pluginConfig);
+ hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkPlugin.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkPlugin.java
deleted file mode 100644
index 33ffb1f91..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkPlugin.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem.HdfsFileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem.HdfsFileSystemCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer.HdfsTransactionStateFileWriteFactory;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-
-import java.util.List;
-import java.util.Optional;
-
-public class HdfsFileSinkPlugin implements SinkFileSystemPlugin {
- @Override
- public String getPluginName() {
- return FileSystemType.HDFS.getFileSystemPluginName();
- }
-
- @Override
- public Optional<TransactionStateFileWriter> getTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
- @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
- @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
- @NonNull List<Integer> sinkColumnsIndexInRow,
- @NonNull String tmpPath,
- @NonNull String targetPath,
- @NonNull String jobId,
- int subTaskIndex,
- @NonNull String fieldDelimiter,
- @NonNull String rowDelimiter,
- @NonNull FileSystem fileSystem) {
- // using factory to generate transaction state file writer
- TransactionStateFileWriter writer = HdfsTransactionStateFileWriteFactory.of(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fieldDelimiter, rowDelimiter, fileSystem);
- return Optional.of(writer);
- }
-
- @Override
- public Optional<FileSystemCommitter> getFileSystemCommitter() {
- return Optional.of(new HdfsFileSystemCommitter());
- }
-
- @Override
- public Optional<FileSystem> getFileSystem() {
- return Optional.of(new HdfsFileSystem());
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystem.java
deleted file mode 100644
index a70e2a3d6..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystem.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem;
-
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class HdfsFileSystem implements FileSystem {
- @Override
- public void deleteFile(String path) throws IOException {
- HdfsUtils.deleteFile(path);
- }
-
- @Override
- public List<String> dirList(String dirPath) throws IOException {
- List<Path> paths = HdfsUtils.dirList(dirPath);
- return paths.stream().map(dir -> dir.getName()).collect(Collectors.toList());
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystemCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystemCommitter.java
deleted file mode 100644
index ae80856e6..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/filesystem/HdfsFileSystemCommitter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem;
-
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
-
-import lombok.NonNull;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class HdfsFileSystemCommitter implements FileSystemCommitter {
- @Override
- public void commitTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException {
- for (Map.Entry<String, Map<String, String>> entry : aggregateCommitInfo.getTransactionMap().entrySet()) {
- for (Map.Entry<String, String> mvFileEntry : entry.getValue().entrySet()) {
- HdfsUtils.renameFile(mvFileEntry.getKey(), mvFileEntry.getValue(), true);
- }
- // delete the transaction dir
- HdfsUtils.deleteFile(entry.getKey());
- }
- }
-
- @Override
- public void abortTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException {
- for (Map.Entry<String, Map<String, String>> entry : aggregateCommitInfo.getTransactionMap().entrySet()) {
- // rollback the file
- for (Map.Entry<String, String> mvFileEntry : entry.getValue().entrySet()) {
- if (HdfsUtils.fileExist(mvFileEntry.getValue()) && !HdfsUtils.fileExist(mvFileEntry.getKey())) {
- HdfsUtils.renameFile(mvFileEntry.getValue(), mvFileEntry.getKey(), true);
- }
- }
- // delete the transaction dir
- HdfsUtils.deleteFile(entry.getKey());
- }
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/util/HdfsUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/util/HdfsUtils.java
deleted file mode 100644
index 7b5b972ed..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/util/HdfsUtils.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util;
-
-import lombok.NonNull;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-public class HdfsUtils {
- private static final Logger LOGGER = LoggerFactory.getLogger(HdfsUtils.class);
-
- public static final int WRITE_BUFFER_SIZE = 2048;
-
- public static final Configuration CONF = new Configuration();
-
- // make the configuration object static, so orc and parquet reader can get it
- static {
- LOGGER.info(System.getenv("HADOOP_CONF_DIR"));
- CONF.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/core-site.xml"));
- CONF.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/hdfs-site.xml"));
- CONF.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- }
-
- public static FileSystem getHdfsFs(@NonNull String path)
- throws IOException {
- return FileSystem.get(URI.create(path), CONF);
- }
-
- public static FSDataOutputStream getOutputStream(@NonNull String outFilePath) throws IOException {
- FileSystem hdfsFs = getHdfsFs(outFilePath);
- Path path = new Path(outFilePath);
- FSDataOutputStream fsDataOutputStream = hdfsFs.create(path, true, WRITE_BUFFER_SIZE);
- return fsDataOutputStream;
- }
-
- public static void createFile(@NonNull String filePath) throws IOException {
- FileSystem hdfsFs = getHdfsFs(filePath);
- Path path = new Path(filePath);
- if (!hdfsFs.createNewFile(path)) {
- throw new IOException("create file " + filePath + " error");
- }
- }
-
- public static void deleteFile(@NonNull String file) throws IOException {
- FileSystem hdfsFs = getHdfsFs(file);
- if (!hdfsFs.delete(new Path(file), true)) {
- throw new IOException("delete file " + file + " error");
- }
- }
-
- /**
- * rename file
- *
- * @param oldName old file name
- * @param newName target file name
- * @param rmWhenExist if this is true, we will delete the target file when it already exists
- * @throws IOException throw IOException
- */
- public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist) throws IOException {
- FileSystem hdfsFs = getHdfsFs(newName);
- LOGGER.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
-
- Path oldPath = new Path(oldName);
- Path newPath = new Path(newName);
- if (rmWhenExist) {
- if (fileExist(newName) && fileExist(oldName)) {
- hdfsFs.delete(newPath, true);
- }
- }
- if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
- createDir(newName.substring(0, newName.lastIndexOf("/")));
- }
-
- if (hdfsFs.rename(oldPath, newPath)) {
- LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
- } else {
- throw new IOException("rename file :[" + oldPath + "] to [" + newPath + "] error");
- }
- }
-
- public static void createDir(@NonNull String filePath) throws IOException {
- FileSystem hdfsFs = getHdfsFs(filePath);
- Path dfs = new Path(filePath);
- if (!hdfsFs.mkdirs(dfs)) {
- throw new IOException("create dir " + filePath + " error");
- }
- }
-
- public static boolean fileExist(@NonNull String filePath) throws IOException {
- FileSystem hdfsFs = getHdfsFs(filePath);
- Path fileName = new Path(filePath);
- return hdfsFs.exists(fileName);
- }
-
- /**
- * get the dir in filePath
- */
- public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
- FileSystem hdfsFs = getHdfsFs(filePath);
- List<Path> pathList = new ArrayList<Path>();
- Path fileName = new Path(filePath);
- FileStatus[] status = hdfsFs.listStatus(fileName);
- if (status != null && status.length > 0) {
- for (FileStatus fileStatus : status) {
- if (fileStatus.isDirectory()) {
- pathList.add(fileStatus.getPath());
- }
- }
- }
- return pathList;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsJsonTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsJsonTransactionStateFileWriter.java
deleted file mode 100644
index c11d57ce6..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsJsonTransactionStateFileWriter.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.serialization.SerializationSchema;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-import org.apache.seatunnel.format.json.JsonSerializationSchema;
-
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@Slf4j
-public class HdfsJsonTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
-
- private static final long serialVersionUID = -5432828969702531646L;
-
- private final byte[] rowDelimiter;
- private final SerializationSchema serializationSchema;
- private Map<String, FSDataOutputStream> beingWrittenOutputStream;
-
- public HdfsJsonTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
- @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
- @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
- @NonNull List<Integer> sinkColumnsIndexInRow,
- @NonNull String tmpPath,
- @NonNull String targetPath,
- @NonNull String jobId,
- int subTaskIndex,
- @NonNull String rowDelimiter,
- @NonNull FileSystem fileSystem) {
- super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
-
- this.rowDelimiter = rowDelimiter.getBytes();
- this.serializationSchema = new JsonSerializationSchema(seaTunnelRowTypeInfo);
- beingWrittenOutputStream = new HashMap<>();
- }
-
- @Override
- public void beginTransaction(String transactionId) {
- this.beingWrittenOutputStream = new HashMap<>();
- }
-
- @Override
- public void abortTransaction(String transactionId) {
- this.beingWrittenOutputStream = new HashMap<>();
- }
-
- @Override
- public void write(@NonNull SeaTunnelRow seaTunnelRow) {
- String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
- FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
- try {
- byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
- fsDataOutputStream.write(rowBytes);
- fsDataOutputStream.write(rowDelimiter);
- } catch (IOException e) {
- log.error("write data to file {} error", filePath);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void finishAndCloseWriteFile() {
- beingWrittenOutputStream.entrySet().forEach(entry -> {
- try {
- entry.getValue().flush();
- } catch (IOException e) {
- log.error("error when flush file {}", entry.getKey());
- throw new RuntimeException(e);
- } finally {
- try {
- entry.getValue().close();
- } catch (IOException e) {
- log.error("error when close output stream {}", entry.getKey());
- }
- }
-
- needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey()));
- });
- }
-
- private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
- FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
- if (fsDataOutputStream == null) {
- try {
- fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
- beingWrittenOutputStream.put(filePath, fsDataOutputStream);
- } catch (IOException e) {
- log.error("can not get output file stream");
- throw new RuntimeException(e);
- }
- }
- return fsDataOutputStream;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsOrcTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsOrcTransactionStateFileWriter.java
deleted file mode 100644
index 23648359c..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsOrcTransactionStateFileWriter.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcFile;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.Writer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class HdfsOrcTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
- private static final Logger LOGGER = LoggerFactory.getLogger(HdfsOrcTransactionStateFileWriter.class);
- private Map<String, Writer> beingWrittenWriter;
-
- public HdfsOrcTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
- @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
- @NonNull List<Integer> sinkColumnsIndexInRow,
- @NonNull String tmpPath,
- @NonNull String targetPath,
- @NonNull String jobId,
- int subTaskIndex,
- @NonNull FileSystem fileSystem) {
- super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
- this.beingWrittenWriter = new HashMap<>();
- }
-
- @Override
- public void write(@NonNull SeaTunnelRow seaTunnelRow) {
- String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
- Writer writer = getOrCreateWriter(filePath);
- TypeDescription schema = buildSchemaWithRowType();
- VectorizedRowBatch rowBatch = schema.createRowBatch();
- int i = 0;
- int row = rowBatch.size++;
- for (Integer index : sinkColumnsIndexInRow) {
- Object value = seaTunnelRow.getField(index);
- ColumnVector vector = rowBatch.cols[i];
- setColumn(value, vector, row);
- i++;
- }
- try {
- writer.addRowBatch(rowBatch);
- rowBatch.reset();
- } catch (IOException e) {
- String errorMsg = String.format("Write data to orc file [%s] error", filePath);
- throw new RuntimeException(errorMsg, e);
- }
- }
-
- @Override
- public void finishAndCloseWriteFile() {
- this.beingWrittenWriter.forEach((k, v) -> {
- try {
- v.close();
- } catch (IOException e) {
- String errorMsg = String.format("Close file [%s] orc writer failed, error msg: [%s]", k, e.getMessage());
- throw new RuntimeException(errorMsg, e);
- } catch (NullPointerException e) {
- // Because orc writer not support be closed multi times, so if the second time close orc writer it will throw NullPointerException
- // In a whole process of file sink, it will experience four stages:
- // 1. beginTransaction 2. prepareCommit 3. commit 4. close
- // In the first stage, it will not close any writers, start with the second stage, writer will be closed.
- // In the last stage, it will not close any writers
- // So orc writer will be closed one extra time after is closed.
- LOGGER.info("Close file [{}] orc writer", k);
- }
- needMoveFiles.put(k, getTargetLocation(k));
- });
- }
-
- @Override
- public void beginTransaction(String transactionId) {
- this.beingWrittenWriter = new HashMap<>();
- }
-
- @Override
- public void abortTransaction(String transactionId) {
- this.beingWrittenWriter = new HashMap<>();
- }
-
- private Writer getOrCreateWriter(@NonNull String filePath) {
- Writer writer = this.beingWrittenWriter.get(filePath);
- if (writer == null) {
- TypeDescription schema = buildSchemaWithRowType();
- Path path = new Path(filePath);
- try {
- OrcFile.WriterOptions options = OrcFile.writerOptions(HdfsUtils.CONF)
- .setSchema(schema)
- // temporarily used snappy
- .compress(CompressionKind.SNAPPY)
- // use orc version 0.12
- .version(OrcFile.Version.V_0_12)
- .overwrite(true);
- Writer newWriter = OrcFile.createWriter(path, options);
- this.beingWrittenWriter.put(filePath, newWriter);
- return newWriter;
- } catch (IOException e) {
- String errorMsg = String.format("Get orc writer for file [%s] error", filePath);
- throw new RuntimeException(errorMsg, e);
- }
- }
- return writer;
- }
-
- private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
- if (BasicType.BOOLEAN_TYPE.equals(type)) {
- return TypeDescription.createBoolean();
- }
- if (BasicType.SHORT_TYPE.equals(type)) {
- return TypeDescription.createShort();
- }
- if (BasicType.INT_TYPE.equals(type)) {
- return TypeDescription.createInt();
- }
- if (BasicType.LONG_TYPE.equals(type)) {
- return TypeDescription.createLong();
- }
- if (BasicType.FLOAT_TYPE.equals(type)) {
- return TypeDescription.createFloat();
- }
- if (BasicType.DOUBLE_TYPE.equals(type)) {
- return TypeDescription.createDouble();
- }
- if (BasicType.BYTE_TYPE.equals(type)) {
- return TypeDescription.createByte();
- }
- return TypeDescription.createString();
- }
-
- private TypeDescription buildSchemaWithRowType() {
- TypeDescription schema = TypeDescription.createStruct();
- for (Integer i : sinkColumnsIndexInRow) {
- TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowTypeInfo.getFieldType(i));
- schema.addField(seaTunnelRowTypeInfo.getFieldName(i), fieldType);
- }
- return schema;
- }
-
- private void setColumn(Object value, ColumnVector vector, int row) {
- if (value == null) {
- vector.isNull[row] = true;
- vector.noNulls = false;
- } else {
- switch (vector.type) {
- case LONG:
- LongColumnVector longVector = (LongColumnVector) vector;
- setLongColumnVector(value, longVector, row);
- break;
- case DOUBLE:
- DoubleColumnVector doubleColumnVector = (DoubleColumnVector) vector;
- setDoubleVector(value, doubleColumnVector, row);
- break;
- case BYTES:
- BytesColumnVector bytesColumnVector = (BytesColumnVector) vector;
- setByteColumnVector(value, bytesColumnVector, row);
- break;
- default:
- throw new RuntimeException("Unexpected ColumnVector subtype");
- }
- }
- }
-
- private void setLongColumnVector(Object value, LongColumnVector longVector, int row) {
- if (value instanceof Boolean) {
- Boolean bool = (Boolean) value;
- longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
- } else if (value instanceof Integer) {
- longVector.vector[row] = (Integer) value;
- } else if (value instanceof Long) {
- longVector.vector[row] = (Long) value;
- } else if (value instanceof BigInteger) {
- BigInteger bigInt = (BigInteger) value;
- longVector.vector[row] = bigInt.longValue();
- } else {
- throw new RuntimeException("Long or Integer type expected for field");
- }
- }
-
- private void setByteColumnVector(Object value, BytesColumnVector bytesColVector, int rowNum) {
- if (value instanceof byte[] || value instanceof String) {
- byte[] byteVec;
- if (value instanceof String) {
- String strVal = (String) value;
- byteVec = strVal.getBytes(StandardCharsets.UTF_8);
- } else {
- byteVec = (byte[]) value;
- }
- bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length);
- } else {
- throw new RuntimeException("byte[] or String type expected for field ");
- }
- }
-
- private void setDoubleVector(Object value, DoubleColumnVector doubleVector, int rowNum) {
- if (value instanceof Double) {
- doubleVector.vector[rowNum] = (Double) value;
- } else if (value instanceof Float) {
- Float floatValue = (Float) value;
- doubleVector.vector[rowNum] = floatValue.doubleValue();
- } else {
- throw new RuntimeException("Double or Float type expected for field ");
- }
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsParquetTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsParquetTransactionStateFileWriter.java
deleted file mode 100644
index 8b904fda9..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsParquetTransactionStateFileWriter.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.hadoop.util.HadoopOutputFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class HdfsParquetTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
- private static final Logger LOGGER = LoggerFactory.getLogger(HdfsParquetTransactionStateFileWriter.class);
- private Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
-
- public HdfsParquetTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
- @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
- @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
- @NonNull List<Integer> sinkColumnsIndexInRow, @NonNull String tmpPath,
- @NonNull String targetPath,
- @NonNull String jobId,
- int subTaskIndex,
- @NonNull FileSystem fileSystem) {
- super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
- beingWrittenWriter = new HashMap<>();
- }
-
- @Override
- public void write(@NonNull SeaTunnelRow seaTunnelRow) {
- String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
- ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
- Schema schema = buildSchemaWithRowType();
- GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
- sinkColumnsIndexInRow.forEach(index -> recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index)));
- GenericData.Record record = recordBuilder.build();
- try {
- writer.write(record);
- } catch (IOException e) {
- String errorMsg = String.format("Write data to file [%s] error", filePath);
- throw new RuntimeException(errorMsg, e);
- }
- }
-
- @Override
- public void finishAndCloseWriteFile() {
- this.beingWrittenWriter.forEach((k, v) -> {
- try {
- v.close();
- } catch (IOException e) {
- String errorMsg = String.format("Close file [%s] parquet writer failed, error msg: [%s]", k, e.getMessage());
- throw new RuntimeException(errorMsg, e);
- }
- needMoveFiles.put(k, getTargetLocation(k));
- });
- }
-
- @Override
- public void beginTransaction(String transactionId) {
- this.beingWrittenWriter = new HashMap<>();
- }
-
- @Override
- public void abortTransaction(String transactionId) {
- this.beingWrittenWriter = new HashMap<>();
- }
-
- private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
- ParquetWriter<GenericRecord> writer = this.beingWrittenWriter.get(filePath);
- if (writer == null) {
- Schema schema = buildSchemaWithRowType();
- Path path = new Path(filePath);
- try {
- HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, HdfsUtils.CONF);
- ParquetWriter<GenericRecord> newWriter = AvroParquetWriter.<GenericRecord>builder(outputFile)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- // use parquet v1 to improve compatibility
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
- // Temporarily use snappy compress
- // I think we can use the compress option in config to control this
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withSchema(schema)
- .build();
- this.beingWrittenWriter.put(filePath, newWriter);
- return newWriter;
- } catch (IOException e) {
- String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
- throw new RuntimeException(errorMsg, e);
- }
- }
- return writer;
- }
-
- private Schema buildSchemaWithRowType() {
- ArrayList<Schema.Field> fields = new ArrayList<>();
- SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowTypeInfo.getFieldTypes();
- String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
- sinkColumnsIndexInRow.forEach(index -> {
- if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[index])) {
- Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BOOLEAN), null, null);
- fields.add(field);
- } else if (BasicType.SHORT_TYPE.equals(fieldTypes[index]) || BasicType.INT_TYPE.equals(fieldTypes[index])) {
- Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.INT), null, null);
- fields.add(field);
- } else if (BasicType.LONG_TYPE.equals(fieldTypes[index])) {
- Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.LONG), null, null);
- fields.add(field);
- } else if (BasicType.FLOAT_TYPE.equals(fieldTypes[index])) {
- Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.FLOAT), null, null);
- fields.add(field);
- } else if (BasicType.DOUBLE_TYPE.equals(fieldTypes[index])) {
- Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.DOUBLE), null, null);
- fields.add(field);
- } else if (BasicType.STRING_TYPE.equals(fieldTypes[index])) {
- Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.STRING), null, null);
- fields.add(field);
- } else if (BasicType.BYTE_TYPE.equals(fieldTypes[index])) {
- Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BYTES), null, null);
- fields.add(field);
- } else if (BasicType.VOID_TYPE.equals(fieldTypes[index])) {
- Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.NULL), null, null);
- fields.add(field);
- }
- });
- return Schema.createRecord("SeatunnelRecord",
- "The record generated by seatunnel file connector",
- "org.apache.parquet.avro",
- false,
- fields);
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTransactionStateFileWriteFactory.java
deleted file mode 100644
index 814491b4f..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTransactionStateFileWriteFactory.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-
-import java.util.List;
-
-public class HdfsTransactionStateFileWriteFactory {
-
- private HdfsTransactionStateFileWriteFactory() {}
-
- public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
- @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
- @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
- @NonNull List<Integer> sinkColumnsIndexInRow,
- @NonNull String tmpPath,
- @NonNull String targetPath,
- @NonNull String jobId,
- int subTaskIndex,
- @NonNull String fieldDelimiter,
- @NonNull String rowDelimiter,
- @NonNull FileSystem fileSystem) {
- FileSinkTransactionFileNameGenerator fileSinkTransactionFileNameGenerator = (FileSinkTransactionFileNameGenerator) transactionFileNameGenerator;
- FileFormat fileFormat = fileSinkTransactionFileNameGenerator.getFileFormat();
- if (fileFormat.equals(FileFormat.CSV)) {
- // #2133 wait this issue closed, there will be replaced using csv writer
- return new HdfsTxtTransactionStateFileWriter(
- seaTunnelRowTypeInfo,
- transactionFileNameGenerator,
- partitionDirNameGenerator,
- sinkColumnsIndexInRow,
- tmpPath,
- targetPath,
- jobId,
- subTaskIndex,
- fieldDelimiter,
- rowDelimiter,
- fileSystem);
- }
- if (fileFormat.equals(FileFormat.PARQUET)) {
- return new HdfsParquetTransactionStateFileWriter(
- seaTunnelRowTypeInfo,
- transactionFileNameGenerator,
- partitionDirNameGenerator,
- sinkColumnsIndexInRow,
- tmpPath,
- targetPath,
- jobId,
- subTaskIndex,
- fileSystem);
- }
- if (fileFormat.equals(FileFormat.ORC)) {
- return new HdfsOrcTransactionStateFileWriter(
- seaTunnelRowTypeInfo,
- transactionFileNameGenerator,
- partitionDirNameGenerator,
- sinkColumnsIndexInRow,
- tmpPath,
- targetPath,
- jobId,
- subTaskIndex,
- fileSystem);
- }
- if (fileFormat.equals(FileFormat.JSON)) {
- return new HdfsJsonTransactionStateFileWriter(
- seaTunnelRowTypeInfo,
- transactionFileNameGenerator,
- partitionDirNameGenerator,
- sinkColumnsIndexInRow,
- tmpPath,
- targetPath,
- jobId,
- subTaskIndex,
- rowDelimiter,
- fileSystem);
- }
- // if file type not supported by file connector, default txt writer will be generated
- return new HdfsTxtTransactionStateFileWriter(
- seaTunnelRowTypeInfo,
- transactionFileNameGenerator,
- partitionDirNameGenerator,
- sinkColumnsIndexInRow,
- tmpPath,
- targetPath,
- jobId,
- subTaskIndex,
- fieldDelimiter,
- rowDelimiter,
- fileSystem);
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTxtTransactionStateFileWriter.java
deleted file mode 100644
index 2ee792a10..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/writer/HdfsTxtTransactionStateFileWriter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
-
-import lombok.NonNull;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class HdfsTxtTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
- private static final Logger LOGGER = LoggerFactory.getLogger(HdfsTxtTransactionStateFileWriter.class);
- private Map<String, FSDataOutputStream> beingWrittenOutputStream;
-
- private String fieldDelimiter;
- private String rowDelimiter;
-
- public HdfsTxtTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
- @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
- @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
- @NonNull List<Integer> sinkColumnsIndexInRow,
- @NonNull String tmpPath,
- @NonNull String targetPath,
- @NonNull String jobId,
- int subTaskIndex,
- @NonNull String fieldDelimiter,
- @NonNull String rowDelimiter,
- @NonNull FileSystem fileSystem) {
- super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
-
- this.fieldDelimiter = fieldDelimiter;
- this.rowDelimiter = rowDelimiter;
- beingWrittenOutputStream = new HashMap<>();
- }
-
- @Override
- public void beginTransaction(String transactionId) {
- this.beingWrittenOutputStream = new HashMap<>();
- }
-
- @Override
- public void abortTransaction(String transactionId) {
- this.beingWrittenOutputStream = new HashMap<>();
- }
-
- @Override
- public void write(@NonNull SeaTunnelRow seaTunnelRow) {
- String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
- FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
- String line = transformRowToLine(seaTunnelRow);
- try {
- fsDataOutputStream.write(line.getBytes());
- fsDataOutputStream.write(rowDelimiter.getBytes());
- } catch (IOException e) {
- LOGGER.error("write data to file {} error", filePath);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void finishAndCloseWriteFile() {
- beingWrittenOutputStream.entrySet().forEach(entry -> {
- try {
- entry.getValue().flush();
- } catch (IOException e) {
- LOGGER.error("error when flush file {}", entry.getKey());
- throw new RuntimeException(e);
- } finally {
- try {
- entry.getValue().close();
- } catch (IOException e) {
- LOGGER.error("error when close output stream {}", entry.getKey());
- }
- }
-
- needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey()));
- });
- }
-
- private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
- FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
- if (fsDataOutputStream == null) {
- try {
- fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
- beingWrittenOutputStream.put(filePath, fsDataOutputStream);
- } catch (IOException e) {
- LOGGER.error("can not get output file stream");
- throw new RuntimeException(e);
- }
- }
- return fsDataOutputStream;
- }
-
- private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
- return this.sinkColumnsIndexInRow.stream().map(index -> seaTunnelRow.getFields()[index] == null ? "" : seaTunnelRow.getFields()[index].toString()).collect(Collectors.joining(fieldDelimiter));
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/FileSinkAggregatedCommitterTest.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/FileSinkAggregatedCommitterTest.java
deleted file mode 100644
index e76ee1d43..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/FileSinkAggregatedCommitterTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem.HdfsFileSystemCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkAggregatedCommitter;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledOnOs;
-import org.junit.jupiter.api.condition.OS;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.stream.Collectors;
-
-@EnabledOnOs(value = {OS.MAC, OS.LINUX})
-public class FileSinkAggregatedCommitterTest {
- @SuppressWarnings("checkstyle:UnnecessaryParentheses")
- @Test
- public void testCommit() throws Exception {
- FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
- Map<String, Map<String, String>> transactionFiles = new HashMap<>();
- Random random = new Random();
- Long jobId = random.nextLong();
- String transactionDir = String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
- String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
- Map<String, String> needMoveFiles = new HashMap<>();
- needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir + "/c3=4/c4=rrr/test1.txt");
- needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir + "/c3=4/c4=bbb/test1.txt");
- HdfsUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
- HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
-
- transactionFiles.put(transactionDir, needMoveFiles);
-
- Map<String, List<String>> partitionDirAndVals = new HashMap<>();
- partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new String[]{"4", "rrr"})).collect(Collectors.toList()));
- partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList()));
-
- FileAggregatedCommitInfo fileAggregatedCommitInfo = new FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
- List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new ArrayList<>();
- fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
- fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
-
- Assertions.assertTrue(HdfsUtils.fileExist(targetDir + "/c3=4/c4=bbb/test1.txt"));
- Assertions.assertTrue(HdfsUtils.fileExist(targetDir + "/c3=4/c4=rrr/test1.txt"));
- Assertions.assertTrue(!HdfsUtils.fileExist(transactionDir));
- }
-
- @SuppressWarnings("checkstyle:UnnecessaryParentheses")
- @Test
- public void testCombine() throws Exception {
- FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
- Map<String, Map<String, String>> transactionFiles = new HashMap<>();
- Random random = new Random();
- Long jobId = random.nextLong();
- String transactionDir = String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
- String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
- Map<String, String> needMoveFiles = new HashMap<>();
- needMoveFiles.put(transactionDir + "/c3=3/c4=rrr/test1.txt", targetDir + "/c3=3/c4=rrr/test1.txt");
- needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir + "/c3=4/c4=bbb/test1.txt");
- Map<String, List<String>> partitionDirAndVals = new HashMap<>();
- partitionDirAndVals.put("/c3=3/c4=rrr", Arrays.stream((new String[]{"3", "rrr"})).collect(Collectors.toList()));
- partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList()));
- FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles, partitionDirAndVals, transactionDir);
- HdfsUtils.createFile(transactionDir + "/c3=3/c4=rrr/test1.txt");
- HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
-
- Map<String, String> needMoveFiles1 = new HashMap<>();
- needMoveFiles1.put(transactionDir + "/c3=4/c4=rrr/test2.txt", targetDir + "/c3=4/c4=rrr/test2.txt");
- needMoveFiles1.put(transactionDir + "/c3=4/c4=bbb/test2.txt", targetDir + "/c3=4/c4=bbb/test2.txt");
- Map<String, List<String>> partitionDirAndVals1 = new HashMap<>();
- partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new String[]{"4", "rrr"})).collect(Collectors.toList()));
- partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList()));
- FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1, partitionDirAndVals1, transactionDir);
- List<FileCommitInfo> fileCommitInfoList = new ArrayList<>();
- fileCommitInfoList.add(fileCommitInfo);
- fileCommitInfoList.add(fileCommitInfo1);
-
- FileAggregatedCommitInfo combine = fileSinkAggregatedCommitter.combine(fileCommitInfoList);
- Assertions.assertEquals(1, combine.getTransactionMap().size());
- Assertions.assertEquals(4, combine.getTransactionMap().get(transactionDir).size());
- Assertions.assertEquals(targetDir + "/c3=3/c4=rrr/test1.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=3/c4=rrr/test1.txt"));
- Assertions.assertEquals(targetDir + "/c3=4/c4=bbb/test1.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=4/c4=bbb/test1.txt"));
- Assertions.assertEquals(targetDir + "/c3=4/c4=rrr/test2.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=4/c4=rrr/test2.txt"));
- Assertions.assertEquals(targetDir + "/c3=4/c4=bbb/test2.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=4/c4=bbb/test2.txt"));
- Assertions.assertEquals(3, combine.getPartitionDirAndValsMap().keySet().size());
- }
-
- @SuppressWarnings("checkstyle:UnnecessaryParentheses")
- @Test
- public void testAbort() throws Exception {
- FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
- Map<String, Map<String, String>> transactionFiles = new HashMap<>();
- Random random = new Random();
- Long jobId = random.nextLong();
- String transactionDir = String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
- String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
- Map<String, String> needMoveFiles = new HashMap<>();
- needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir + "/c3=4/c4=rrr/test1.txt");
- needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir + "/c3=4/c4=bbb/test1.txt");
- Map<String, List<String>> partitionDirAndVals = new HashMap<>();
- partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new String[]{"4", "rrr"})).collect(Collectors.toList()));
- partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList()));
- HdfsUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
- HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
-
- transactionFiles.put(transactionDir, needMoveFiles);
- FileAggregatedCommitInfo fileAggregatedCommitInfo = new FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
- List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new ArrayList<>();
- fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
- fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
-
- Assertions.assertTrue(HdfsUtils.fileExist(targetDir + "/c3=4/c4=bbb/test1.txt"));
- Assertions.assertTrue(HdfsUtils.fileExist(targetDir + "/c3=4/c4=rrr/test1.txt"));
- Assertions.assertTrue(!HdfsUtils.fileExist(transactionDir));
-
- fileSinkAggregatedCommitter.abort(fileAggregatedCommitInfoList);
- Assertions.assertTrue(!HdfsUtils.fileExist(targetDir + "/c3=4/c4=bbb/test1.txt"));
- Assertions.assertTrue(!HdfsUtils.fileExist(targetDir + "/c3=4/c4=rrr/test1.txt"));
-
- // transactionDir will being delete when abort
- Assertions.assertTrue(!HdfsUtils.fileExist(transactionDir));
- }
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/TestHdfsTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/TestHdfsTxtTransactionStateFileWriter.java
deleted file mode 100644
index 88ca2b9f4..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/TestHdfsTxtTransactionStateFileWriter.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.filesystem.HdfsFileSystem;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.writer.HdfsTxtTransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledOnOs;
-import org.junit.jupiter.api.condition.OS;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-@EnabledOnOs(value = {OS.MAC, OS.LINUX})
-public class TestHdfsTxtTransactionStateFileWriter {
-
- @SuppressWarnings("checkstyle:MagicNumber")
- @Test
- public void testHdfsTextTransactionStateFileWriter() throws Exception {
- String[] fieldNames = new String[]{"c1", "c2", "c3", "c4"};
- SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[]{BasicType.BOOLEAN_TYPE, BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE};
- SeaTunnelRowType seaTunnelRowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
-
- List<Integer> sinkColumnIndexInRow = new ArrayList<>();
- sinkColumnIndexInRow.add(0);
- sinkColumnIndexInRow.add(1);
-
- List<String> hivePartitionFieldList = new ArrayList<>();
- hivePartitionFieldList.add("c3");
- hivePartitionFieldList.add("c4");
-
- List<Integer> partitionFieldIndexInRow = new ArrayList<>();
- partitionFieldIndexInRow.add(2);
- partitionFieldIndexInRow.add(3);
-
- String jobId = System.currentTimeMillis() + "";
- String targetPath = "/tmp/hive/warehouse/seatunnel.db/test1";
- String tmpPath = "/tmp/seatunnel";
-
- TransactionStateFileWriter fileWriter = new HdfsTxtTransactionStateFileWriter(seaTunnelRowTypeInfo,
- new FileSinkTransactionFileNameGenerator(FileFormat.TEXT, null, "yyyy.MM.dd"),
- new FileSinkPartitionDirNameGenerator(hivePartitionFieldList, partitionFieldIndexInRow, "${k0}=${v0}/${k1}=${v1}"),
- sinkColumnIndexInRow,
- tmpPath,
- targetPath,
- jobId,
- 0,
- String.valueOf('\001'),
- "\n",
- new HdfsFileSystem());
-
- String transactionId = fileWriter.beginTransaction(1L);
-
- SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{true, 1, "str1", "str2"});
- fileWriter.write(seaTunnelRow);
-
- SeaTunnelRow seaTunnelRow1 = new SeaTunnelRow(new Object[]{true, 1, "str1", "str3"});
- fileWriter.write(seaTunnelRow1);
-
- Optional<FileCommitInfo> fileCommitInfoOptional = fileWriter.prepareCommit();
- //check file exists and file content
- Assertions.assertTrue(fileCommitInfoOptional.isPresent());
- FileCommitInfo fileCommitInfo = fileCommitInfoOptional.get();
- String transactionDir = tmpPath + "/seatunnel/" + jobId + "/" + transactionId;
- Assertions.assertEquals(transactionDir, fileCommitInfo.getTransactionDir());
- Assertions.assertEquals(2, fileCommitInfo.getNeedMoveFiles().size());
- Map<String, String> needMoveFiles = fileCommitInfo.getNeedMoveFiles();
- Assertions.assertEquals(targetPath + "/c3=str1/c4=str2/" + transactionId + ".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str2/" + transactionId + ".txt"));
- Assertions.assertEquals(targetPath + "/c3=str1/c4=str3/" + transactionId + ".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str3/" + transactionId + ".txt"));
-
- Map<String, List<String>> partitionDirAndValsMap = fileCommitInfo.getPartitionDirAndValsMap();
- Assertions.assertEquals(2, partitionDirAndValsMap.size());
- Assertions.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str2"));
- Assertions.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str3"));
- Assertions.assertTrue(partitionDirAndValsMap.get("c3=str1/c4=str2").size() == 2);
- Assertions.assertEquals("str1", partitionDirAndValsMap.get("c3=str1/c4=str2").get(0));
- Assertions.assertEquals("str2", partitionDirAndValsMap.get("c3=str1/c4=str2").get(1));
- Assertions.assertEquals("str1", partitionDirAndValsMap.get("c3=str1/c4=str3").get(0));
- Assertions.assertEquals("str3", partitionDirAndValsMap.get("c3=str1/c4=str3").get(1));
- }
-}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
index 50d7c34d2..d6bfdefc5 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/file/FakeSourceToFileIT.java
@@ -53,31 +53,4 @@ public class FakeSourceToFileIT extends FlinkContainer {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_local_json.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
-
- /**
- * fake source -> hdfs text file sink
- */
- @Test
- public void testFakeSourceToHdfsFileText() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_text.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- }
-
- /**
- * fake source -> hdfs parquet file sink
- */
- @Test
- public void testFakeSourceToHdfsFileParquet() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_parquet.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- }
-
- /**
- * fake source -> hdfs json file sink
- */
- @Test
- public void testFakeSourceToHdfsFileJson() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_json.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- }
}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
deleted file mode 100644
index 269b85d08..000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
- # You can set flink configuration here
- execution.parallelism = 1
- job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
- FakeSource {
- result_table_name = "fake"
- schema = {
- fields {
- name = "string"
- age = "int"
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
- sql {
- sql = "select name,age from fake"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
-}
-
-sink {
- HdfsFile {
- path="/tmp/hive/warehouse/test2"
- row_delimiter="\n"
- partition_by=["age"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
- file_name_expression="${transactionId}_${now}"
- file_format="json"
- sink_columns=["name","age"]
- filename_time_format="yyyy.MM.dd"
- is_enable_transaction=true
- save_mode="error"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
deleted file mode 100644
index 5e1ea5c01..000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
- # You can set flink configuration here
- execution.parallelism = 1
- job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
- FakeSource {
- result_table_name = "fake"
- schema = {
- fields {
- name = "string"
- age = "int"
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
- sql {
- sql = "select name,age from fake"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
-}
-
-sink {
- HdfsFile {
- path="/tmp/hive/warehouse/test2"
- field_delimiter="\t"
- row_delimiter="\n"
- partition_by=["age"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
- file_name_expression="${transactionId}_${now}"
- file_format="parquet"
- sink_columns=["name","age"]
- filename_time_format="yyyy.MM.dd"
- is_enable_transaction=true
- save_mode="error"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
deleted file mode 100644
index d4a8a745c..000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-file-flink-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
- # You can set flink configuration here
- execution.parallelism = 1
- job.mode = "BATCH"
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
- FakeSource {
- result_table_name = "fake"
- schema = {
- fields {
- name = "string"
- age = "int"
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
- sql {
- sql = "select name,age from fake"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
-}
-
-sink {
- HdfsFile {
- path="/tmp/hive/warehouse/test2"
- field_delimiter="\t"
- row_delimiter="\n"
- partition_by=["age"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
- file_name_expression="${transactionId}_${now}"
- file_format="text"
- sink_columns=["name","age"]
- filename_time_format="yyyy.MM.dd"
- is_enable_transaction=true
- save_mode="error"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
index fccd8db06..b2535ebf2 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
@@ -57,32 +57,4 @@ public class FakeSourceToFileIT extends SparkContainer {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_json.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
-
-
- /**
- * fake source -> hdfs text file sink
- */
- @Test
- public void testFakeSourceToHdfsFileText() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_text.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- }
-
- /**
- * fake source -> hdfs parquet file sink
- */
- @Test
- public void testFakeSourceToHdfsFileParquet() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_parquet.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- }
-
- /**
- * fake source -> hdfs json file sink
- */
- @Test
- public void testFakeSourceToHdfsFileJson() throws IOException, InterruptedException {
- Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_json.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- }
}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
deleted file mode 100644
index 40454bce0..000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
+++ /dev/null
@@ -1,69 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
- job.mode = "BATCH"
-}
-
-source {
- FakeSource {
- result_table_name = "fake"
- schema = {
- fields {
- name = "string"
- age = "int"
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
- sql {
- sql = "select name,age from fake"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/category/transform
-}
-
-sink {
- HdfsFile {
- path="/tmp/hive/warehouse/test2"
- row_delimiter="\n"
- partition_by=["age"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
- file_name_expression="${transactionId}_${now}"
- file_format="json"
- sink_columns=["name","age"]
- filename_time_format="yyyy.MM.dd"
- is_enable_transaction=true
- save_mode="error"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
deleted file mode 100644
index 550990eea..000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
- job.mode = "BATCH"
-}
-
-source {
- FakeSource {
- result_table_name = "fake"
- schema = {
- fields {
- name = "string"
- age = "int"
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
- sql {
- sql = "select name,age from fake"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/category/transform
-}
-
-sink {
- HdfsFile {
- path="/tmp/hive/warehouse/test2"
- field_delimiter="\t"
- row_delimiter="\n"
- partition_by=["age"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
- file_name_expression="${transactionId}_${now}"
- file_format="parquet"
- sink_columns=["name","age"]
- filename_time_format="yyyy.MM.dd"
- is_enable_transaction=true
- save_mode="error"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
deleted file mode 100644
index 2bf6afba6..000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
- job.mode = "BATCH"
-}
-
-source {
- FakeSource {
- result_table_name = "fake"
- schema = {
- fields {
- name = "string"
- age = "int"
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
-}
-
-transform {
- sql {
- sql = "select name,age from fake"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/category/transform
-}
-
-sink {
- HdfsFile {
- path="/tmp/hive/warehouse/test2"
- field_delimiter="\t"
- row_delimiter="\n"
- partition_by=["age"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
- file_name_expression="${transactionId}_${now}"
- file_format="text"
- sink_columns=["name","age"]
- filename_time_format="yyyy.MM.dd"
- is_enable_transaction=true
- save_mode="error"
- }
-
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
-}
\ No newline at end of file