You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/27 08:42:25 UTC

[incubator-seatunnel] branch dev updated: [Connector-V2] Add parquet writer in file connector (#2273)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c95cc72cf [Connector-V2] Add parquet writer in file connector (#2273)
c95cc72cf is described below

commit c95cc72cfa87589d46deaf22ff030059b072316a
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Jul 27 16:42:19 2022 +0800

    [Connector-V2] Add parquet writer in file connector (#2273)
---
 docs/en/connector-v2/sink/File.mdx                 |   4 +-
 .../connector-file/connector-file-base/pom.xml     |   8 +
 .../seatunnel/file/config/FileFormat.java          |   3 +-
 .../FileSinkTransactionFileNameGenerator.java      |   2 +
 .../file/sink/hdfs/HdfsFileSinkPlugin.java         |  14 +-
 .../HdfsParquetTransactionStateFileWriter.java     | 174 ++++++++++++++++++++
 .../hdfs/HdfsTransactionStateFileWriteFactory.java |  90 +++++++++++
 .../seatunnel/file/sink/hdfs/HdfsUtils.java        |  29 ++--
 .../connector-file/connector-file-local/pom.xml    |   6 +
 .../file/sink/local/LocalFileSinkPlugin.java       |  14 +-
 .../LocalParquetTransactionStateFileWriter.java    | 177 +++++++++++++++++++++
 .../LocalTransactionStateFileWriteFactory.java     |  90 +++++++++++
 12 files changed, 573 insertions(+), 38 deletions(-)

diff --git a/docs/en/connector-v2/sink/File.mdx b/docs/en/connector-v2/sink/File.mdx
index 7e2e3efd7..74311b296 100644
--- a/docs/en/connector-v2/sink/File.mdx
+++ b/docs/en/connector-v2/sink/File.mdx
@@ -46,7 +46,9 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
 
 ### file_format [string]
 
-We supported `file_format` is `text`.
+We supported as the following file types:
+
+`text` `parquet`
 
 Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 2f3c38c51..0bcff2db2 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -43,10 +43,16 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-collections4</artifactId>
         </dependency>
+
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
@@ -63,10 +69,12 @@
             <artifactId>powermock-module-junit4</artifactId>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-api-mockito2</artifactId>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 6b3f31f79..9b830cd9c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -21,7 +21,8 @@ import java.io.Serializable;
 
 public enum FileFormat implements Serializable {
     CSV("csv"),
-    TEXT("txt");
+    TEXT("txt"),
+    PARQUET("parquet");
 
     private String suffix;
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkTransactionFileNameGenerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkTransactionFileNameGenerator.java
index ba005c7de..1c253cc5d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkTransactionFileNameGenerator.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkTransactionFileNameGenerator.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
 
+import lombok.Data;
 import lombok.NonNull;
 import org.apache.commons.lang3.StringUtils;
 
@@ -31,6 +32,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+@Data
 public class FileSinkTransactionFileNameGenerator implements TransactionFileNameGenerator {
     private FileFormat fileFormat;
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsFileSinkPlugin.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsFileSinkPlugin.java
index 6e6c9380c..d93c126cd 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsFileSinkPlugin.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsFileSinkPlugin.java
@@ -49,17 +49,9 @@ public class HdfsFileSinkPlugin implements SinkFileSystemPlugin {
                                                                               @NonNull String fieldDelimiter,
                                                                               @NonNull String rowDelimiter,
                                                                               @NonNull FileSystem fileSystem) {
-        return Optional.of(new HdfsTxtTransactionStateFileWriter(seaTunnelRowTypeInfo,
-            transactionFileNameGenerator,
-            partitionDirNameGenerator,
-            sinkColumnsIndexInRow,
-            tmpPath,
-            targetPath,
-            jobId,
-            subTaskIndex,
-            fieldDelimiter,
-            rowDelimiter,
-            fileSystem));
+        // using factory to generate transaction state file writer
+        TransactionStateFileWriter writer = HdfsTransactionStateFileWriteFactory.of(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fieldDelimiter, rowDelimiter, fileSystem);
+        return Optional.of(writer);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsParquetTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsParquetTransactionStateFileWriter.java
new file mode 100644
index 000000000..b309bef7a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsParquetTransactionStateFileWriter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsParquetTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsParquetTransactionStateFileWriter.class);
+    private Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
+
+    public HdfsParquetTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+                                                 @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+                                                 @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+                                                 @NonNull List<Integer> sinkColumnsIndexInRow, @NonNull String tmpPath,
+                                                 @NonNull String targetPath,
+                                                 @NonNull String jobId,
+                                                 int subTaskIndex,
+                                                 @NonNull FileSystem fileSystem) {
+        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+        beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
+        Schema schema = buildSchemaWithRowType();
+        GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
+        sinkColumnsIndexInRow.forEach(index -> {
+            if (seaTunnelRowTypeInfo.getFieldType(index).equals(BasicType.STRING_TYPE)) {
+                recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index).toString());
+            } else {
+                recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index));
+            }
+        });
+        GenericData.Record record = recordBuilder.build();
+        try {
+            writer.write(record);
+        } catch (IOException e) {
+            String errorMsg = String.format("Write data to file [%s] error", filePath);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseWriteFile() {
+        this.beingWrittenWriter.forEach((k, v) -> {
+            try {
+                v.close();
+            } catch (IOException e) {
+                String errorMsg = String.format("Close file [%s] parquet writer failed, error msg: [%s]", k, e.getMessage());
+                throw new RuntimeException(errorMsg, e);
+            }
+            needMoveFiles.put(k, getTargetLocation(k));
+        });
+    }
+
+    @Override
+    public void beginTransaction(String transactionId) {
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void abortTransaction(String transactionId) {
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
+        ParquetWriter<GenericRecord> writer = this.beingWrittenWriter.get(filePath);
+        if (writer == null) {
+            Schema schema = buildSchemaWithRowType();
+            Path path = new Path(filePath);
+            try {
+                HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, HdfsUtils.CONF);
+                ParquetWriter<GenericRecord> newWriter = AvroParquetWriter.<GenericRecord>builder(outputFile)
+                        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                        // use parquet v1 to improve compatibility
+                        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+                        // Temporarily use snappy compress
+                        // I think we can use the compress option in config to control this
+                        .withCompressionCodec(CompressionCodecName.SNAPPY)
+                        .withSchema(schema)
+                        .build();
+                this.beingWrittenWriter.put(filePath, newWriter);
+                return newWriter;
+            } catch (IOException e) {
+                String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
+                throw new RuntimeException(errorMsg, e);
+            }
+        }
+        return writer;
+    }
+
+    private Schema buildSchemaWithRowType() {
+        ArrayList<Schema.Field> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowTypeInfo.getFieldTypes();
+        String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
+        sinkColumnsIndexInRow.forEach(index -> {
+            if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BOOLEAN), null, null);
+                fields.add(field);
+            } else if (BasicType.SHORT_TYPE.equals(fieldTypes[index]) || BasicType.INT_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.INT), null, null);
+                fields.add(field);
+            } else if (BasicType.LONG_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.LONG), null, null);
+                fields.add(field);
+            } else if (BasicType.FLOAT_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.FLOAT), null, null);
+                fields.add(field);
+            } else if (BasicType.DOUBLE_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.DOUBLE), null, null);
+                fields.add(field);
+            } else if (BasicType.STRING_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.STRING), null, null);
+                fields.add(field);
+            } else if (BasicType.BYTE_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BYTES), null, null);
+                fields.add(field);
+            } else if (BasicType.VOID_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.NULL), null, null);
+                fields.add(field);
+            }
+        });
+        return Schema.createRecord("SeatunnelRecord",
+                "The record generated by seatunnel file connector",
+                "org.apache.parquet.avro",
+                false,
+                fields);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
new file mode 100644
index 000000000..5cb6db36b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+public class HdfsTransactionStateFileWriteFactory {
+
+    private HdfsTransactionStateFileWriteFactory() {}
+
+    public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+                                                @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+                                                @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+                                                @NonNull List<Integer> sinkColumnsIndexInRow,
+                                                @NonNull String tmpPath,
+                                                @NonNull String targetPath,
+                                                @NonNull String jobId,
+                                                int subTaskIndex,
+                                                @NonNull String fieldDelimiter,
+                                                @NonNull String rowDelimiter,
+                                                @NonNull FileSystem fileSystem) {
+        FileSinkTransactionFileNameGenerator fileSinkTransactionFileNameGenerator = (FileSinkTransactionFileNameGenerator) transactionFileNameGenerator;
+        FileFormat fileFormat = fileSinkTransactionFileNameGenerator.getFileFormat();
+        if (fileFormat.equals(FileFormat.CSV)) {
+            // #2133 wait this issue closed, there will be replaced using csv writer
+            return new HdfsTxtTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fieldDelimiter,
+                    rowDelimiter,
+                    fileSystem);
+        }
+        if (fileFormat.equals(FileFormat.PARQUET)) {
+            return new HdfsParquetTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fileSystem);
+        }
+        // if file type not supported by file connector, default txt writer will be generated
+        return new HdfsTxtTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fieldDelimiter,
+                    rowDelimiter,
+                    fileSystem);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsUtils.java
index 421c7f7eb..97a1429c3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsUtils.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsUtils.java
@@ -37,14 +37,19 @@ public class HdfsUtils {
 
     public static final int WRITE_BUFFER_SIZE = 2048;
 
-    public static FileSystem getHdfsFs(@NonNull String path)
-        throws IOException {
-        Configuration conf = new Configuration();
+    public static final Configuration CONF = new Configuration();
+
+    // make the configuration object static, so orc and parquet reader can get it
+    static {
         LOGGER.info(System.getenv("HADOOP_CONF_DIR"));
-        conf.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/core-site.xml"));
-        conf.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/hdfs-site.xml"));
-        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-        return FileSystem.get(URI.create(path), conf);
+        CONF.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/core-site.xml"));
+        CONF.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/hdfs-site.xml"));
+        CONF.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+    }
+
+    public static FileSystem getHdfsFs(@NonNull String path)
+            throws IOException {
+        return FileSystem.get(URI.create(path), CONF);
     }
 
     public static FSDataOutputStream getOutputStream(@NonNull String outFilePath) throws IOException {
@@ -99,9 +104,7 @@ public class HdfsUtils {
         }
     }
 
-    public static void createDir(@NonNull String filePath)
-        throws IOException {
-
+    public static void createDir(@NonNull String filePath) throws IOException {
         FileSystem hdfsFs = getHdfsFs(filePath);
         Path dfs = new Path(filePath);
         if (!hdfsFs.mkdirs(dfs)) {
@@ -109,8 +112,7 @@ public class HdfsUtils {
         }
     }
 
-    public static boolean fileExist(@NonNull String filePath)
-        throws IOException {
+    public static boolean fileExist(@NonNull String filePath) throws IOException {
         FileSystem hdfsFs = getHdfsFs(filePath);
         Path fileName = new Path(filePath);
         return hdfsFs.exists(fileName);
@@ -119,8 +121,7 @@ public class HdfsUtils {
     /**
      * get the dir in filePath
      */
-    public static List<Path> dirList(@NonNull String filePath)
-        throws FileNotFoundException, IOException {
+    public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
         FileSystem hdfsFs = getHdfsFs(filePath);
         List<Path> pathList = new ArrayList<Path>();
         Path fileName = new Path(filePath);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
index 1ac5bb77b..90797614c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
@@ -35,6 +35,12 @@
             <artifactId>connector-file-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2</artifactId>
+            <version>${flink-shaded-hadoop-2.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalFileSinkPlugin.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalFileSinkPlugin.java
index 1d4bc43e5..a50cfe263 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalFileSinkPlugin.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalFileSinkPlugin.java
@@ -49,17 +49,9 @@ public class LocalFileSinkPlugin implements SinkFileSystemPlugin {
                                                                               @NonNull String fieldDelimiter,
                                                                               @NonNull String rowDelimiter,
                                                                               @NonNull FileSystem fileSystem) {
-        return Optional.of(new LocalTxtTransactionStateFileWriter(seaTunnelRowTypeInfo,
-            transactionFileNameGenerator,
-            partitionDirNameGenerator,
-            sinkColumnsIndexInRow,
-            tmpPath,
-            targetPath,
-            jobId,
-            subTaskIndex,
-            fieldDelimiter,
-            rowDelimiter,
-            fileSystem));
+        // using factory to generate transaction state file writer
+        TransactionStateFileWriter writer = LocalTransactionStateFileWriteFactory.of(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fieldDelimiter, rowDelimiter, fileSystem);
+        return Optional.of(writer);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalParquetTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalParquetTransactionStateFileWriter.java
new file mode 100644
index 000000000..ec5833e4a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalParquetTransactionStateFileWriter.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.local;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LocalParquetTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+    private static final Logger LOGGER = LoggerFactory.getLogger(LocalParquetTransactionStateFileWriter.class);
+    private final Configuration configuration = new Configuration();
+    private Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
+
+    public LocalParquetTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+                                                 @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+                                                 @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+                                                 @NonNull List<Integer> sinkColumnsIndexInRow, @NonNull String tmpPath,
+                                                 @NonNull String targetPath,
+                                                 @NonNull String jobId,
+                                                 int subTaskIndex,
+                                                 @NonNull FileSystem fileSystem) {
+        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+        beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
+        Schema schema = buildSchemaWithRowType();
+        GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
+        sinkColumnsIndexInRow.forEach(index -> {
+            if (seaTunnelRowTypeInfo.getFieldType(index).equals(BasicType.STRING_TYPE)) {
+                recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index).toString());
+            } else {
+                recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index));
+            }
+        });
+        GenericData.Record record = recordBuilder.build();
+        try {
+            writer.write(record);
+        } catch (IOException e) {
+            String errorMsg = String.format("Write data to parquet file [%s] error", filePath);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseWriteFile() {
+        this.beingWrittenWriter.forEach((k, v) -> {
+            try {
+                v.close();
+            } catch (IOException e) {
+                String errorMsg = String.format("Close file [%s] parquet writer failed, error msg: [%s]", k, e.getMessage());
+                throw new RuntimeException(errorMsg, e);
+            }
+            needMoveFiles.put(k, getTargetLocation(k));
+        });
+    }
+
+    @Override
+    public void beginTransaction(String transactionId) {
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void abortTransaction(String transactionId) {
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
+        ParquetWriter<GenericRecord> writer = this.beingWrittenWriter.get(filePath);
+        if (writer == null) {
+            Schema schema = buildSchemaWithRowType();
+            Path path = new Path(filePath);
+            try {
+                // In order to write file to local file system we should use empty configuration object
+                HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, configuration);
+                ParquetWriter<GenericRecord> newWriter = AvroParquetWriter.<GenericRecord>builder(outputFile)
+                        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                        // use parquet v1 to improve compatibility
+                        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+                        // Temporarily use snappy compress
+                        // I think we can use the compress option in config to control this
+                        .withCompressionCodec(CompressionCodecName.SNAPPY)
+                        .withSchema(schema)
+                        .build();
+                this.beingWrittenWriter.put(filePath, newWriter);
+                return newWriter;
+            } catch (IOException e) {
+                String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
+                throw new RuntimeException(errorMsg, e);
+            }
+        }
+        return writer;
+    }
+
+    private Schema buildSchemaWithRowType() {
+        ArrayList<Schema.Field> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowTypeInfo.getFieldTypes();
+        String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
+        sinkColumnsIndexInRow.forEach(index -> {
+            if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BOOLEAN), null, null);
+                fields.add(field);
+            } else if (BasicType.SHORT_TYPE.equals(fieldTypes[index]) || BasicType.INT_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.INT), null, null);
+                fields.add(field);
+            } else if (BasicType.LONG_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.LONG), null, null);
+                fields.add(field);
+            } else if (BasicType.FLOAT_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.FLOAT), null, null);
+                fields.add(field);
+            } else if (BasicType.DOUBLE_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.DOUBLE), null, null);
+                fields.add(field);
+            } else if (BasicType.STRING_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.STRING), null, null);
+                fields.add(field);
+            } else if (BasicType.BYTE_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BYTES), null, null);
+                fields.add(field);
+            } else if (BasicType.VOID_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.NULL), null, null);
+                fields.add(field);
+            }
+        });
+        return Schema.createRecord("SeatunnelRecord",
+                "The record generated by seatunnel file connector",
+                "org.apache.parquet.avro",
+                false,
+                fields);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
new file mode 100644
index 000000000..c4244bfc0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.local;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+public class LocalTransactionStateFileWriteFactory {
+
+    private LocalTransactionStateFileWriteFactory() {}
+
+    public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+                                                @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+                                                @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+                                                @NonNull List<Integer> sinkColumnsIndexInRow,
+                                                @NonNull String tmpPath,
+                                                @NonNull String targetPath,
+                                                @NonNull String jobId,
+                                                int subTaskIndex,
+                                                @NonNull String fieldDelimiter,
+                                                @NonNull String rowDelimiter,
+                                                @NonNull FileSystem fileSystem) {
+        FileSinkTransactionFileNameGenerator fileSinkTransactionFileNameGenerator = (FileSinkTransactionFileNameGenerator) transactionFileNameGenerator;
+        FileFormat fileFormat = fileSinkTransactionFileNameGenerator.getFileFormat();
+        if (fileFormat.equals(FileFormat.CSV)) {
+            // #2133 wait this issue closed, there will be replaced using csv writer
+            return new LocalTxtTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fieldDelimiter,
+                    rowDelimiter,
+                    fileSystem);
+        }
+        if (fileFormat.equals(FileFormat.PARQUET)) {
+            return new LocalParquetTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fileSystem);
+        }
+        // if file type not supported by file connector, default txt writer will be generated
+        return new LocalTxtTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fieldDelimiter,
+                    rowDelimiter,
+                    fileSystem);
+    }
+}