You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/05 02:29:02 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Support orc file format in file connector (#2369)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f44fe1e03 [Feature][Connector-V2] Support orc file format in file connector (#2369)
f44fe1e03 is described below

commit f44fe1e033221cdf0cd1c8c2c966e7e6bd81ce56
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Fri Aug 5 10:28:57 2022 +0800

    [Feature][Connector-V2] Support orc file format in file connector (#2369)
    
    * [Feature][Connector-V2] Add orc-core dependency in connector-file-base (#2130)
    
    * [Feature][Connector-V2] Add orc enum in FileFormat (#2130)
    
    * [Feature][Connector-V2] Add orc file writer in connector-file-hadoop (#2130)
    
    * [Feature][Connector-V2] Add orc file writer in connector-file-local (#2130)
    
    * optimize file sink doc (#2363)
    
    * [Feature][Connector-V2] Update doc for file connectors (#2130)
    
    * [Feature][Connector-V2] Resolve hadoop class conflicts (#2130)
    
    Co-authored-by: Eric <ga...@gmail.com>
---
 docs/en/connector-v2/sink/HdfsFile.md              |  22 +-
 docs/en/connector-v2/sink/LocalFile.md             |  22 +-
 .../connector-file/connector-file-base/pom.xml     |  11 +
 .../seatunnel/file/config/FileFormat.java          |   3 +-
 .../hdfs/HdfsOrcTransactionStateFileWriter.java    | 243 ++++++++++++++++++++
 .../hdfs/HdfsTransactionStateFileWriteFactory.java |  12 +
 .../local/LocalOrcTransactionStateFileWriter.java  | 245 +++++++++++++++++++++
 .../LocalTransactionStateFileWriteFactory.java     |  12 +
 8 files changed, 567 insertions(+), 3 deletions(-)

diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index 36d9f6b35..b3d692c33 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -38,7 +38,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
 
 We supported as the following file types:
 
-`text` `csv` `parquet`
+`text` `csv` `parquet` `orc`
 
 Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
 
@@ -104,6 +104,8 @@ For the specific meaning of each mode, see [save-modes](https://spark.apache.org
 
 ## Example
 
+For text file format
+
 ```bash
 
 HdfsFile {
@@ -139,3 +141,21 @@ HdfsFile {
 }
 
 ```
+
+For orc file format
+
+```bash
+
+HdfsFile {
+    path="hdfs://mycluster/tmp/hive/warehouse/test2"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="orc"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+}
+
+```
diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md
index ffb0efb8c..c8ef79305 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -36,7 +36,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
 
 We supported as the following file types:
 
-`text` `csv` `parquet`
+`text` `csv` `parquet` `orc`
 
 Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
 
@@ -102,6 +102,8 @@ For the specific meaning of each mode, see [save-modes](https://spark.apache.org
 
 ## Example
 
+For text file format
+
 ```bash
 
 LocalFile {
@@ -137,3 +139,21 @@ LocalFile {
 }
 
 ```
+
+For orc file format
+
+```bash
+
+LocalFile {
+    path="file:///tmp/hive/warehouse/test2"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="orc"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+}
+
+```
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 9473556eb..b046c2278 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -48,6 +48,17 @@
             <artifactId>parquet-avro</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.orc</groupId>
+            <artifactId>orc-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>hadoop-common</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-collections4</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 9b830cd9c..c02f58168 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -22,7 +22,8 @@ import java.io.Serializable;
 public enum FileFormat implements Serializable {
     CSV("csv"),
     TEXT("txt"),
-    PARQUET("parquet");
+    PARQUET("parquet"),
+    ORC("orc");
 
     private String suffix;
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsOrcTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsOrcTransactionStateFileWriter.java
new file mode 100644
index 000000000..0bad801e5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsOrcTransactionStateFileWriter.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsOrcTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsOrcTransactionStateFileWriter.class);
+    private Map<String, Writer> beingWrittenWriter;
+
+    public HdfsOrcTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+                                             @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+                                             @NonNull List<Integer> sinkColumnsIndexInRow,
+                                             @NonNull String tmpPath,
+                                             @NonNull String targetPath,
+                                             @NonNull String jobId,
+                                             int subTaskIndex,
+                                             @NonNull FileSystem fileSystem) {
+        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        Writer writer = getOrCreateWriter(filePath);
+        TypeDescription schema = buildSchemaWithRowType();
+        VectorizedRowBatch rowBatch = schema.createRowBatch();
+        int i = 0;
+        int row = rowBatch.size++;
+        for (Integer index : sinkColumnsIndexInRow) {
+            Object value = seaTunnelRow.getField(index);
+            ColumnVector vector = rowBatch.cols[i];
+            setColumn(value, vector, row);
+            i++;
+        }
+        try {
+            writer.addRowBatch(rowBatch);
+            rowBatch.reset();
+        } catch (IOException e) {
+            String errorMsg = String.format("Write data to orc file [%s] error", filePath);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseWriteFile() {
+        this.beingWrittenWriter.forEach((k, v) -> {
+            try {
+                v.close();
+            } catch (IOException e) {
+                String errorMsg = String.format("Close file [%s] orc writer failed, error msg: [%s]", k, e.getMessage());
+                throw new RuntimeException(errorMsg, e);
+            } catch (NullPointerException e) {
+                // Because orc writer not support be closed multi times, so if the second time close orc writer it will throw NullPointerException
+                // In a whole process of file sink, it will experience four stages:
+                // 1. beginTransaction 2. prepareCommit 3. commit 4. close
+                // In the first stage, it will not close any writers, start with the second stage, writer will be closed.
+                // In the last stage, it will not close any writers
+                // So orc writer will be closed one extra time after is closed.
+                LOGGER.info("Close file [{}] orc writer", k);
+            }
+            needMoveFiles.put(k, getTargetLocation(k));
+        });
+    }
+
+    @Override
+    public void beginTransaction(String transactionId) {
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void abortTransaction(String transactionId) {
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    private Writer getOrCreateWriter(@NonNull String filePath) {
+        Writer writer = this.beingWrittenWriter.get(filePath);
+        if (writer == null) {
+            TypeDescription schema = buildSchemaWithRowType();
+            Path path = new Path(filePath);
+            try {
+                OrcFile.WriterOptions options = OrcFile.writerOptions(HdfsUtils.CONF)
+                        .setSchema(schema)
+                        // temporarily used snappy
+                        .compress(CompressionKind.SNAPPY)
+                        // use orc version 0.12
+                        .version(OrcFile.Version.V_0_12)
+                        .overwrite(true);
+                Writer newWriter = OrcFile.createWriter(path, options);
+                this.beingWrittenWriter.put(filePath, newWriter);
+                return newWriter;
+            } catch (IOException e) {
+                String errorMsg = String.format("Get orc writer for file [%s] error", filePath);
+                throw new RuntimeException(errorMsg, e);
+            }
+        }
+        return writer;
+    }
+
+    private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
+        if (BasicType.BOOLEAN_TYPE.equals(type)) {
+            return TypeDescription.createBoolean();
+        }
+        if (BasicType.SHORT_TYPE.equals(type)) {
+            return TypeDescription.createShort();
+        }
+        if (BasicType.INT_TYPE.equals(type)) {
+            return TypeDescription.createInt();
+        }
+        if (BasicType.LONG_TYPE.equals(type)) {
+            return TypeDescription.createLong();
+        }
+        if (BasicType.FLOAT_TYPE.equals(type)) {
+            return TypeDescription.createFloat();
+        }
+        if (BasicType.DOUBLE_TYPE.equals(type)) {
+            return TypeDescription.createDouble();
+        }
+        if (BasicType.BYTE_TYPE.equals(type)) {
+            return TypeDescription.createByte();
+        }
+        return TypeDescription.createString();
+    }
+
+    private TypeDescription buildSchemaWithRowType() {
+        TypeDescription schema = TypeDescription.createStruct();
+        for (Integer i : sinkColumnsIndexInRow) {
+            TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowTypeInfo.getFieldType(i));
+            schema.addField(seaTunnelRowTypeInfo.getFieldName(i), fieldType);
+        }
+        return schema;
+    }
+
+    private void setColumn(Object value, ColumnVector vector, int row) {
+        if (value == null) {
+            vector.isNull[row] = true;
+            vector.noNulls = false;
+        } else {
+            switch (vector.type) {
+                case LONG:
+                    LongColumnVector longVector = (LongColumnVector) vector;
+                    setLongColumnVector(value, longVector, row);
+                    break;
+                case DOUBLE:
+                    DoubleColumnVector doubleColumnVector = (DoubleColumnVector) vector;
+                    setDoubleVector(value, doubleColumnVector, row);
+                    break;
+                case BYTES:
+                    BytesColumnVector bytesColumnVector = (BytesColumnVector) vector;
+                    setByteColumnVector(value, bytesColumnVector, row);
+                    break;
+                default:
+                    throw new RuntimeException("Unexpected ColumnVector subtype");
+            }
+        }
+    }
+
+    private void setLongColumnVector(Object value, LongColumnVector longVector, int row) {
+        if (value instanceof Boolean) {
+            Boolean bool = (Boolean) value;
+            longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
+        }  else if (value instanceof Integer) {
+            longVector.vector[row] = (Integer) value;
+        } else if (value instanceof Long) {
+            longVector.vector[row] = (Long) value;
+        } else if (value instanceof BigInteger) {
+            BigInteger bigInt = (BigInteger) value;
+            longVector.vector[row] = bigInt.longValue();
+        } else {
+            throw new RuntimeException("Long or Integer type expected for field");
+        }
+    }
+
+    private void setByteColumnVector(Object value, BytesColumnVector bytesColVector, int rowNum) {
+        if (value instanceof byte[] || value instanceof String) {
+            byte[] byteVec;
+            if (value instanceof String) {
+                String strVal = (String) value;
+                byteVec = strVal.getBytes(StandardCharsets.UTF_8);
+            } else {
+                byteVec = (byte[]) value;
+            }
+            bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length);
+        } else {
+            throw new RuntimeException("byte[] or String type expected for field ");
+        }
+    }
+
+    private void setDoubleVector(Object value, DoubleColumnVector doubleVector, int rowNum) {
+        if (value instanceof Double) {
+            doubleVector.vector[rowNum] = (Double) value;
+        } else if (value instanceof Float) {
+            Float floatValue = (Float) value;
+            doubleVector.vector[rowNum] = floatValue.doubleValue();
+        } else {
+            throw new RuntimeException("Double or Float type expected for field ");
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
index 5cb6db36b..f4c5cd840 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
@@ -73,6 +73,18 @@ public class HdfsTransactionStateFileWriteFactory {
                     subTaskIndex,
                     fileSystem);
         }
+        if (fileFormat.equals(FileFormat.ORC)) {
+            return new HdfsOrcTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fileSystem);
+        }
         // if file type not supported by file connector, default txt writer will be generated
         return new HdfsTxtTransactionStateFileWriter(
                     seaTunnelRowTypeInfo,
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalOrcTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalOrcTransactionStateFileWriter.java
new file mode 100644
index 000000000..19ef12f68
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalOrcTransactionStateFileWriter.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.local;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LocalOrcTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+    private static final Logger LOGGER = LoggerFactory.getLogger(LocalOrcTransactionStateFileWriter.class);
+    private final Configuration configuration = new Configuration();
+    private Map<String, Writer> beingWrittenWriter;
+
+    public LocalOrcTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+                                              @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+                                              @NonNull List<Integer> sinkColumnsIndexInRow,
+                                              @NonNull String tmpPath,
+                                              @NonNull String targetPath,
+                                              @NonNull String jobId,
+                                              int subTaskIndex,
+                                              @NonNull FileSystem fileSystem) {
+        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        Writer writer = getOrCreateWriter(filePath);
+        TypeDescription schema = buildSchemaWithRowType();
+        VectorizedRowBatch rowBatch = schema.createRowBatch();
+        int i = 0;
+        int row = rowBatch.size++;
+        for (Integer index : sinkColumnsIndexInRow) {
+            Object value = seaTunnelRow.getField(index);
+            ColumnVector vector = rowBatch.cols[i];
+            setColumn(value, vector, row);
+            i++;
+        }
+        try {
+            writer.addRowBatch(rowBatch);
+            rowBatch.reset();
+        } catch (IOException e) {
+            String errorMsg = String.format("Write data to orc file [%s] error", filePath);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseWriteFile() {
+        this.beingWrittenWriter.forEach((k, v) -> {
+            try {
+                v.close();
+            } catch (IOException e) {
+                String errorMsg = String.format("Close file [%s] orc writer failed, error msg: [%s]", k, e.getMessage());
+                throw new RuntimeException(errorMsg, e);
+            } catch (NullPointerException e) {
+                // Because orc writer not support be closed multi times, so if the second time close orc writer it will throw NullPointerException
+                // In a whole process of file sink, it will experience four stages:
+                // 1. beginTransaction 2. prepareCommit 3. commit 4. close
+                // In the first stage, it will not close any writers, start with the second stage, writer will be closed.
+                // In the last stage, it will not close any writers
+                // So orc writer will be closed one extra time after is closed.
+                LOGGER.info("Close file [{}] orc writer", k);
+            }
+            needMoveFiles.put(k, getTargetLocation(k));
+        });
+    }
+
+    @Override
+    public void beginTransaction(String transactionId) {
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void abortTransaction(String transactionId) {
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    private Writer getOrCreateWriter(@NonNull String filePath) {
+        Writer writer = this.beingWrittenWriter.get(filePath);
+        if (writer == null) {
+            TypeDescription schema = buildSchemaWithRowType();
+            Path path = new Path(filePath);
+            try {
+                OrcFile.WriterOptions options = OrcFile.writerOptions(configuration)
+                        .setSchema(schema)
+                        // temporarily used snappy
+                        .compress(CompressionKind.SNAPPY)
+                        // use orc version 0.12
+                        .version(OrcFile.Version.V_0_12)
+                        .overwrite(true);
+                Writer newWriter = OrcFile.createWriter(path, options);
+                this.beingWrittenWriter.put(filePath, newWriter);
+                return newWriter;
+            } catch (IOException e) {
+                String errorMsg = String.format("Get orc writer for file [%s] error", filePath);
+                throw new RuntimeException(errorMsg, e);
+            }
+        }
+        return writer;
+    }
+
+    private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
+        if (BasicType.BOOLEAN_TYPE.equals(type)) {
+            return TypeDescription.createBoolean();
+        }
+        if (BasicType.SHORT_TYPE.equals(type)) {
+            return TypeDescription.createShort();
+        }
+        if (BasicType.INT_TYPE.equals(type)) {
+            return TypeDescription.createInt();
+        }
+        if (BasicType.LONG_TYPE.equals(type)) {
+            return TypeDescription.createLong();
+        }
+        if (BasicType.FLOAT_TYPE.equals(type)) {
+            return TypeDescription.createFloat();
+        }
+        if (BasicType.DOUBLE_TYPE.equals(type)) {
+            return TypeDescription.createDouble();
+        }
+        if (BasicType.BYTE_TYPE.equals(type)) {
+            return TypeDescription.createByte();
+        }
+        return TypeDescription.createString();
+    }
+
+    private TypeDescription buildSchemaWithRowType() {
+        TypeDescription schema = TypeDescription.createStruct();
+        for (Integer i : sinkColumnsIndexInRow) {
+            TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowTypeInfo.getFieldType(i));
+            schema.addField(seaTunnelRowTypeInfo.getFieldName(i), fieldType);
+        }
+        return schema;
+    }
+
+    private void setColumn(Object value, ColumnVector vector, int row) {
+        if (value == null) {
+            vector.isNull[row] = true;
+            vector.noNulls = false;
+        } else {
+            switch (vector.type) {
+                case LONG:
+                    LongColumnVector longVector = (LongColumnVector) vector;
+                    setLongColumnVector(value, longVector, row);
+                    break;
+                case DOUBLE:
+                    DoubleColumnVector doubleColumnVector = (DoubleColumnVector) vector;
+                    setDoubleVector(value, doubleColumnVector, row);
+                    break;
+                case BYTES:
+                    BytesColumnVector bytesColumnVector = (BytesColumnVector) vector;
+                    setByteColumnVector(value, bytesColumnVector, row);
+                    break;
+                default:
+                    throw new RuntimeException("Unexpected ColumnVector subtype");
+            }
+        }
+    }
+
+    private void setLongColumnVector(Object value, LongColumnVector longVector, int row) {
+        if (value instanceof Boolean) {
+            Boolean bool = (Boolean) value;
+            longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
+        }  else if (value instanceof Integer) {
+            longVector.vector[row] = (Integer) value;
+        } else if (value instanceof Long) {
+            longVector.vector[row] = (Long) value;
+        } else if (value instanceof BigInteger) {
+            BigInteger bigInt = (BigInteger) value;
+            longVector.vector[row] = bigInt.longValue();
+        } else {
+            throw new RuntimeException("Long or Integer type expected for field");
+        }
+    }
+
+    private void setByteColumnVector(Object value, BytesColumnVector bytesColVector, int rowNum) {
+        if (value instanceof byte[] || value instanceof String) {
+            byte[] byteVec;
+            if (value instanceof String) {
+                String strVal = (String) value;
+                byteVec = strVal.getBytes(StandardCharsets.UTF_8);
+            } else {
+                byteVec = (byte[]) value;
+            }
+            bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length);
+        } else {
+            throw new RuntimeException("byte[] or String type expected for field ");
+        }
+    }
+
+    private void setDoubleVector(Object value, DoubleColumnVector doubleVector, int rowNum) {
+        if (value instanceof Double) {
+            doubleVector.vector[rowNum] = (Double) value;
+        } else if (value instanceof Float) {
+            Float floatValue = (Float) value;
+            doubleVector.vector[rowNum] = floatValue.doubleValue();
+        } else {
+            throw new RuntimeException("Double or Float type expected for field ");
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
index c4244bfc0..05e92d5fd 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
@@ -73,6 +73,18 @@ public class LocalTransactionStateFileWriteFactory {
                     subTaskIndex,
                     fileSystem);
         }
+        if (fileFormat.equals(FileFormat.ORC)) {
+            return new LocalOrcTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fileSystem);
+        }
         // if file type not supported by file connector, default txt writer will be generated
         return new LocalTxtTransactionStateFileWriter(
                     seaTunnelRowTypeInfo,