You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/05 02:29:02 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Support orc file format in file connector (#2369)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f44fe1e03 [Feature][Connector-V2] Support orc file format in file connector (#2369)
f44fe1e03 is described below
commit f44fe1e033221cdf0cd1c8c2c966e7e6bd81ce56
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Fri Aug 5 10:28:57 2022 +0800
[Feature][Connector-V2] Support orc file format in file connector (#2369)
* [Feature][Connector-V2] Add orc-core dependency in connector-file-base (#2130)
* [Feature][Connector-V2] Add orc enum in FileFormat (#2130)
* [Feature][Connector-V2] Add orc file writer in connector-file-hadoop (#2130)
* [Feature][Connector-V2] Add orc file writer in connector-file-local (#2130)
* optimize file sink doc (#2363)
* [Feature][Connector-V2] Update doc for file connectors (#2130)
* [Feature][Connector-V2] Resolve hadoop class conflicts (#2130)
Co-authored-by: Eric <ga...@gmail.com>
---
docs/en/connector-v2/sink/HdfsFile.md | 22 +-
docs/en/connector-v2/sink/LocalFile.md | 22 +-
.../connector-file/connector-file-base/pom.xml | 11 +
.../seatunnel/file/config/FileFormat.java | 3 +-
.../hdfs/HdfsOrcTransactionStateFileWriter.java | 243 ++++++++++++++++++++
.../hdfs/HdfsTransactionStateFileWriteFactory.java | 12 +
.../local/LocalOrcTransactionStateFileWriter.java | 245 +++++++++++++++++++++
.../LocalTransactionStateFileWriteFactory.java | 12 +
8 files changed, 567 insertions(+), 3 deletions(-)
diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index 36d9f6b35..b3d692c33 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -38,7 +38,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
We supported as the following file types:
-`text` `csv` `parquet`
+`text` `csv` `parquet` `orc`
Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
@@ -104,6 +104,8 @@ For the specific meaning of each mode, see [save-modes](https://spark.apache.org
## Example
+For text file format
+
```bash
HdfsFile {
@@ -139,3 +141,21 @@ HdfsFile {
}
```
+
+For orc file format
+
+```bash
+
+HdfsFile {
+ path="hdfs://mycluster/tmp/hive/warehouse/test2"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="orc"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+}
+
+```
diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md
index ffb0efb8c..c8ef79305 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -36,7 +36,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
We supported as the following file types:
-`text` `csv` `parquet`
+`text` `csv` `parquet` `orc`
Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
@@ -102,6 +102,8 @@ For the specific meaning of each mode, see [save-modes](https://spark.apache.org
## Example
+For text file format
+
```bash
LocalFile {
@@ -137,3 +139,21 @@ LocalFile {
}
```
+
+For orc file format
+
+```bash
+
+LocalFile {
+ path="file:///tmp/hive/warehouse/test2"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="orc"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+}
+
+```
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 9473556eb..b046c2278 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -48,6 +48,17 @@
<artifactId>parquet-avro</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-common</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 9b830cd9c..c02f58168 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -22,7 +22,8 @@ import java.io.Serializable;
public enum FileFormat implements Serializable {
CSV("csv"),
TEXT("txt"),
- PARQUET("parquet");
+ PARQUET("parquet"),
+ ORC("orc");
private String suffix;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsOrcTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsOrcTransactionStateFileWriter.java
new file mode 100644
index 000000000..0bad801e5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsOrcTransactionStateFileWriter.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsOrcTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HdfsOrcTransactionStateFileWriter.class);
+ private Map<String, Writer> beingWrittenWriter;
+
+ public HdfsOrcTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer> sinkColumnsIndexInRow,
+ @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull FileSystem fileSystem) {
+ super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ Writer writer = getOrCreateWriter(filePath);
+ TypeDescription schema = buildSchemaWithRowType();
+ VectorizedRowBatch rowBatch = schema.createRowBatch();
+ int i = 0;
+ int row = rowBatch.size++;
+ for (Integer index : sinkColumnsIndexInRow) {
+ Object value = seaTunnelRow.getField(index);
+ ColumnVector vector = rowBatch.cols[i];
+ setColumn(value, vector, row);
+ i++;
+ }
+ try {
+ writer.addRowBatch(rowBatch);
+ rowBatch.reset();
+ } catch (IOException e) {
+ String errorMsg = String.format("Write data to orc file [%s] error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseWriteFile() {
+ this.beingWrittenWriter.forEach((k, v) -> {
+ try {
+ v.close();
+ } catch (IOException e) {
+ String errorMsg = String.format("Close file [%s] orc writer failed, error msg: [%s]", k, e.getMessage());
+ throw new RuntimeException(errorMsg, e);
+ } catch (NullPointerException e) {
+ // Because orc writer not support be closed multi times, so if the second time close orc writer it will throw NullPointerException
+ // In a whole process of file sink, it will experience four stages:
+ // 1. beginTransaction 2. prepareCommit 3. commit 4. close
+ // In the first stage, it will not close any writers, start with the second stage, writer will be closed.
+ // In the last stage, it will not close any writers
+ // So orc writer will be closed one extra time after is closed.
+ LOGGER.info("Close file [{}] orc writer", k);
+ }
+ needMoveFiles.put(k, getTargetLocation(k));
+ });
+ }
+
+ @Override
+ public void beginTransaction(String transactionId) {
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void abortTransaction(String transactionId) {
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ private Writer getOrCreateWriter(@NonNull String filePath) {
+ Writer writer = this.beingWrittenWriter.get(filePath);
+ if (writer == null) {
+ TypeDescription schema = buildSchemaWithRowType();
+ Path path = new Path(filePath);
+ try {
+ OrcFile.WriterOptions options = OrcFile.writerOptions(HdfsUtils.CONF)
+ .setSchema(schema)
+ // temporarily used snappy
+ .compress(CompressionKind.SNAPPY)
+ // use orc version 0.12
+ .version(OrcFile.Version.V_0_12)
+ .overwrite(true);
+ Writer newWriter = OrcFile.createWriter(path, options);
+ this.beingWrittenWriter.put(filePath, newWriter);
+ return newWriter;
+ } catch (IOException e) {
+ String errorMsg = String.format("Get orc writer for file [%s] error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+ return writer;
+ }
+
+ private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
+ if (BasicType.BOOLEAN_TYPE.equals(type)) {
+ return TypeDescription.createBoolean();
+ }
+ if (BasicType.SHORT_TYPE.equals(type)) {
+ return TypeDescription.createShort();
+ }
+ if (BasicType.INT_TYPE.equals(type)) {
+ return TypeDescription.createInt();
+ }
+ if (BasicType.LONG_TYPE.equals(type)) {
+ return TypeDescription.createLong();
+ }
+ if (BasicType.FLOAT_TYPE.equals(type)) {
+ return TypeDescription.createFloat();
+ }
+ if (BasicType.DOUBLE_TYPE.equals(type)) {
+ return TypeDescription.createDouble();
+ }
+ if (BasicType.BYTE_TYPE.equals(type)) {
+ return TypeDescription.createByte();
+ }
+ return TypeDescription.createString();
+ }
+
+ private TypeDescription buildSchemaWithRowType() {
+ TypeDescription schema = TypeDescription.createStruct();
+ for (Integer i : sinkColumnsIndexInRow) {
+ TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowTypeInfo.getFieldType(i));
+ schema.addField(seaTunnelRowTypeInfo.getFieldName(i), fieldType);
+ }
+ return schema;
+ }
+
+ private void setColumn(Object value, ColumnVector vector, int row) {
+ if (value == null) {
+ vector.isNull[row] = true;
+ vector.noNulls = false;
+ } else {
+ switch (vector.type) {
+ case LONG:
+ LongColumnVector longVector = (LongColumnVector) vector;
+ setLongColumnVector(value, longVector, row);
+ break;
+ case DOUBLE:
+ DoubleColumnVector doubleColumnVector = (DoubleColumnVector) vector;
+ setDoubleVector(value, doubleColumnVector, row);
+ break;
+ case BYTES:
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) vector;
+ setByteColumnVector(value, bytesColumnVector, row);
+ break;
+ default:
+ throw new RuntimeException("Unexpected ColumnVector subtype");
+ }
+ }
+ }
+
+ private void setLongColumnVector(Object value, LongColumnVector longVector, int row) {
+ if (value instanceof Boolean) {
+ Boolean bool = (Boolean) value;
+ longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
+ } else if (value instanceof Integer) {
+ longVector.vector[row] = (Integer) value;
+ } else if (value instanceof Long) {
+ longVector.vector[row] = (Long) value;
+ } else if (value instanceof BigInteger) {
+ BigInteger bigInt = (BigInteger) value;
+ longVector.vector[row] = bigInt.longValue();
+ } else {
+ throw new RuntimeException("Long or Integer type expected for field");
+ }
+ }
+
+ private void setByteColumnVector(Object value, BytesColumnVector bytesColVector, int rowNum) {
+ if (value instanceof byte[] || value instanceof String) {
+ byte[] byteVec;
+ if (value instanceof String) {
+ String strVal = (String) value;
+ byteVec = strVal.getBytes(StandardCharsets.UTF_8);
+ } else {
+ byteVec = (byte[]) value;
+ }
+ bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length);
+ } else {
+ throw new RuntimeException("byte[] or String type expected for field ");
+ }
+ }
+
+ private void setDoubleVector(Object value, DoubleColumnVector doubleVector, int rowNum) {
+ if (value instanceof Double) {
+ doubleVector.vector[rowNum] = (Double) value;
+ } else if (value instanceof Float) {
+ Float floatValue = (Float) value;
+ doubleVector.vector[rowNum] = floatValue.doubleValue();
+ } else {
+ throw new RuntimeException("Double or Float type expected for field ");
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
index 5cb6db36b..f4c5cd840 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
@@ -73,6 +73,18 @@ public class HdfsTransactionStateFileWriteFactory {
subTaskIndex,
fileSystem);
}
+ if (fileFormat.equals(FileFormat.ORC)) {
+ return new HdfsOrcTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fileSystem);
+ }
// if file type not supported by file connector, default txt writer will be generated
return new HdfsTxtTransactionStateFileWriter(
seaTunnelRowTypeInfo,
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalOrcTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalOrcTransactionStateFileWriter.java
new file mode 100644
index 000000000..19ef12f68
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalOrcTransactionStateFileWriter.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.local;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LocalOrcTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LocalOrcTransactionStateFileWriter.class);
+ private final Configuration configuration = new Configuration();
+ private Map<String, Writer> beingWrittenWriter;
+
+ public LocalOrcTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer> sinkColumnsIndexInRow,
+ @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull FileSystem fileSystem) {
+ super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ Writer writer = getOrCreateWriter(filePath);
+ TypeDescription schema = buildSchemaWithRowType();
+ VectorizedRowBatch rowBatch = schema.createRowBatch();
+ int i = 0;
+ int row = rowBatch.size++;
+ for (Integer index : sinkColumnsIndexInRow) {
+ Object value = seaTunnelRow.getField(index);
+ ColumnVector vector = rowBatch.cols[i];
+ setColumn(value, vector, row);
+ i++;
+ }
+ try {
+ writer.addRowBatch(rowBatch);
+ rowBatch.reset();
+ } catch (IOException e) {
+ String errorMsg = String.format("Write data to orc file [%s] error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseWriteFile() {
+ this.beingWrittenWriter.forEach((k, v) -> {
+ try {
+ v.close();
+ } catch (IOException e) {
+ String errorMsg = String.format("Close file [%s] orc writer failed, error msg: [%s]", k, e.getMessage());
+ throw new RuntimeException(errorMsg, e);
+ } catch (NullPointerException e) {
+ // Because orc writer not support be closed multi times, so if the second time close orc writer it will throw NullPointerException
+ // In a whole process of file sink, it will experience four stages:
+ // 1. beginTransaction 2. prepareCommit 3. commit 4. close
+ // In the first stage, it will not close any writers, start with the second stage, writer will be closed.
+ // In the last stage, it will not close any writers
+ // So orc writer will be closed one extra time after is closed.
+ LOGGER.info("Close file [{}] orc writer", k);
+ }
+ needMoveFiles.put(k, getTargetLocation(k));
+ });
+ }
+
+ @Override
+ public void beginTransaction(String transactionId) {
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void abortTransaction(String transactionId) {
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ private Writer getOrCreateWriter(@NonNull String filePath) {
+ Writer writer = this.beingWrittenWriter.get(filePath);
+ if (writer == null) {
+ TypeDescription schema = buildSchemaWithRowType();
+ Path path = new Path(filePath);
+ try {
+ OrcFile.WriterOptions options = OrcFile.writerOptions(configuration)
+ .setSchema(schema)
+ // temporarily used snappy
+ .compress(CompressionKind.SNAPPY)
+ // use orc version 0.12
+ .version(OrcFile.Version.V_0_12)
+ .overwrite(true);
+ Writer newWriter = OrcFile.createWriter(path, options);
+ this.beingWrittenWriter.put(filePath, newWriter);
+ return newWriter;
+ } catch (IOException e) {
+ String errorMsg = String.format("Get orc writer for file [%s] error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+ return writer;
+ }
+
+ private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
+ if (BasicType.BOOLEAN_TYPE.equals(type)) {
+ return TypeDescription.createBoolean();
+ }
+ if (BasicType.SHORT_TYPE.equals(type)) {
+ return TypeDescription.createShort();
+ }
+ if (BasicType.INT_TYPE.equals(type)) {
+ return TypeDescription.createInt();
+ }
+ if (BasicType.LONG_TYPE.equals(type)) {
+ return TypeDescription.createLong();
+ }
+ if (BasicType.FLOAT_TYPE.equals(type)) {
+ return TypeDescription.createFloat();
+ }
+ if (BasicType.DOUBLE_TYPE.equals(type)) {
+ return TypeDescription.createDouble();
+ }
+ if (BasicType.BYTE_TYPE.equals(type)) {
+ return TypeDescription.createByte();
+ }
+ return TypeDescription.createString();
+ }
+
+ private TypeDescription buildSchemaWithRowType() {
+ TypeDescription schema = TypeDescription.createStruct();
+ for (Integer i : sinkColumnsIndexInRow) {
+ TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowTypeInfo.getFieldType(i));
+ schema.addField(seaTunnelRowTypeInfo.getFieldName(i), fieldType);
+ }
+ return schema;
+ }
+
+ private void setColumn(Object value, ColumnVector vector, int row) {
+ if (value == null) {
+ vector.isNull[row] = true;
+ vector.noNulls = false;
+ } else {
+ switch (vector.type) {
+ case LONG:
+ LongColumnVector longVector = (LongColumnVector) vector;
+ setLongColumnVector(value, longVector, row);
+ break;
+ case DOUBLE:
+ DoubleColumnVector doubleColumnVector = (DoubleColumnVector) vector;
+ setDoubleVector(value, doubleColumnVector, row);
+ break;
+ case BYTES:
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) vector;
+ setByteColumnVector(value, bytesColumnVector, row);
+ break;
+ default:
+ throw new RuntimeException("Unexpected ColumnVector subtype");
+ }
+ }
+ }
+
+ private void setLongColumnVector(Object value, LongColumnVector longVector, int row) {
+ if (value instanceof Boolean) {
+ Boolean bool = (Boolean) value;
+ longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
+ } else if (value instanceof Integer) {
+ longVector.vector[row] = (Integer) value;
+ } else if (value instanceof Long) {
+ longVector.vector[row] = (Long) value;
+ } else if (value instanceof BigInteger) {
+ BigInteger bigInt = (BigInteger) value;
+ longVector.vector[row] = bigInt.longValue();
+ } else {
+ throw new RuntimeException("Long or Integer type expected for field");
+ }
+ }
+
+ private void setByteColumnVector(Object value, BytesColumnVector bytesColVector, int rowNum) {
+ if (value instanceof byte[] || value instanceof String) {
+ byte[] byteVec;
+ if (value instanceof String) {
+ String strVal = (String) value;
+ byteVec = strVal.getBytes(StandardCharsets.UTF_8);
+ } else {
+ byteVec = (byte[]) value;
+ }
+ bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length);
+ } else {
+ throw new RuntimeException("byte[] or String type expected for field ");
+ }
+ }
+
+ private void setDoubleVector(Object value, DoubleColumnVector doubleVector, int rowNum) {
+ if (value instanceof Double) {
+ doubleVector.vector[rowNum] = (Double) value;
+ } else if (value instanceof Float) {
+ Float floatValue = (Float) value;
+ doubleVector.vector[rowNum] = floatValue.doubleValue();
+ } else {
+ throw new RuntimeException("Double or Float type expected for field ");
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
index c4244bfc0..05e92d5fd 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
@@ -73,6 +73,18 @@ public class LocalTransactionStateFileWriteFactory {
subTaskIndex,
fileSystem);
}
+ if (fileFormat.equals(FileFormat.ORC)) {
+ return new LocalOrcTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fileSystem);
+ }
// if file type not supported by file connector, default txt writer will be generated
return new LocalTxtTransactionStateFileWriter(
seaTunnelRowTypeInfo,