You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/07/27 08:42:25 UTC
[incubator-seatunnel] branch dev updated: [Connector-V2] Add parquet writer in file connector (#2273)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c95cc72cf [Connector-V2] Add parquet writer in file connector (#2273)
c95cc72cf is described below
commit c95cc72cfa87589d46deaf22ff030059b072316a
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Jul 27 16:42:19 2022 +0800
[Connector-V2] Add parquet writer in file connector (#2273)
---
docs/en/connector-v2/sink/File.mdx | 4 +-
.../connector-file/connector-file-base/pom.xml | 8 +
.../seatunnel/file/config/FileFormat.java | 3 +-
.../FileSinkTransactionFileNameGenerator.java | 2 +
.../file/sink/hdfs/HdfsFileSinkPlugin.java | 14 +-
.../HdfsParquetTransactionStateFileWriter.java | 174 ++++++++++++++++++++
.../hdfs/HdfsTransactionStateFileWriteFactory.java | 90 +++++++++++
.../seatunnel/file/sink/hdfs/HdfsUtils.java | 29 ++--
.../connector-file/connector-file-local/pom.xml | 6 +
.../file/sink/local/LocalFileSinkPlugin.java | 14 +-
.../LocalParquetTransactionStateFileWriter.java | 177 +++++++++++++++++++++
.../LocalTransactionStateFileWriteFactory.java | 90 +++++++++++
12 files changed, 573 insertions(+), 38 deletions(-)
diff --git a/docs/en/connector-v2/sink/File.mdx b/docs/en/connector-v2/sink/File.mdx
index 7e2e3efd7..74311b296 100644
--- a/docs/en/connector-v2/sink/File.mdx
+++ b/docs/en/connector-v2/sink/File.mdx
@@ -46,7 +46,9 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
### file_format [string]
-We supported `file_format` is `text`.
+We supported as the following file types:
+
+`text` `parquet`
Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 2f3c38c51..0bcff2db2 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -43,10 +43,16 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -63,10 +69,12 @@
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 6b3f31f79..9b830cd9c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -21,7 +21,8 @@ import java.io.Serializable;
public enum FileFormat implements Serializable {
CSV("csv"),
- TEXT("txt");
+ TEXT("txt"),
+ PARQUET("parquet");
private String suffix;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkTransactionFileNameGenerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkTransactionFileNameGenerator.java
index ba005c7de..1c253cc5d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkTransactionFileNameGenerator.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkTransactionFileNameGenerator.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import lombok.Data;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
@@ -31,6 +32,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+@Data
public class FileSinkTransactionFileNameGenerator implements TransactionFileNameGenerator {
private FileFormat fileFormat;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsFileSinkPlugin.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsFileSinkPlugin.java
index 6e6c9380c..d93c126cd 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsFileSinkPlugin.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsFileSinkPlugin.java
@@ -49,17 +49,9 @@ public class HdfsFileSinkPlugin implements SinkFileSystemPlugin {
@NonNull String fieldDelimiter,
@NonNull String rowDelimiter,
@NonNull FileSystem fileSystem) {
- return Optional.of(new HdfsTxtTransactionStateFileWriter(seaTunnelRowTypeInfo,
- transactionFileNameGenerator,
- partitionDirNameGenerator,
- sinkColumnsIndexInRow,
- tmpPath,
- targetPath,
- jobId,
- subTaskIndex,
- fieldDelimiter,
- rowDelimiter,
- fileSystem));
+ // using factory to generate transaction state file writer
+ TransactionStateFileWriter writer = HdfsTransactionStateFileWriteFactory.of(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fieldDelimiter, rowDelimiter, fileSystem);
+ return Optional.of(writer);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsParquetTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsParquetTransactionStateFileWriter.java
new file mode 100644
index 000000000..b309bef7a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsParquetTransactionStateFileWriter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsParquetTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HdfsParquetTransactionStateFileWriter.class);
+ private Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
+
+ public HdfsParquetTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+ @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer> sinkColumnsIndexInRow, @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull FileSystem fileSystem) {
+ super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+ beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
+ Schema schema = buildSchemaWithRowType();
+ GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
+ sinkColumnsIndexInRow.forEach(index -> {
+ if (seaTunnelRowTypeInfo.getFieldType(index).equals(BasicType.STRING_TYPE)) {
+ recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index).toString());
+ } else {
+ recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index));
+ }
+ });
+ GenericData.Record record = recordBuilder.build();
+ try {
+ writer.write(record);
+ } catch (IOException e) {
+ String errorMsg = String.format("Write data to file [%s] error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseWriteFile() {
+ this.beingWrittenWriter.forEach((k, v) -> {
+ try {
+ v.close();
+ } catch (IOException e) {
+ String errorMsg = String.format("Close file [%s] parquet writer failed, error msg: [%s]", k, e.getMessage());
+ throw new RuntimeException(errorMsg, e);
+ }
+ needMoveFiles.put(k, getTargetLocation(k));
+ });
+ }
+
+ @Override
+ public void beginTransaction(String transactionId) {
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void abortTransaction(String transactionId) {
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
+ ParquetWriter<GenericRecord> writer = this.beingWrittenWriter.get(filePath);
+ if (writer == null) {
+ Schema schema = buildSchemaWithRowType();
+ Path path = new Path(filePath);
+ try {
+ HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, HdfsUtils.CONF);
+ ParquetWriter<GenericRecord> newWriter = AvroParquetWriter.<GenericRecord>builder(outputFile)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ // use parquet v1 to improve compatibility
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+ // Temporarily use snappy compress
+ // I think we can use the compress option in config to control this
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withSchema(schema)
+ .build();
+ this.beingWrittenWriter.put(filePath, newWriter);
+ return newWriter;
+ } catch (IOException e) {
+ String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+ return writer;
+ }
+
+ private Schema buildSchemaWithRowType() {
+ ArrayList<Schema.Field> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowTypeInfo.getFieldTypes();
+ String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
+ sinkColumnsIndexInRow.forEach(index -> {
+ if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BOOLEAN), null, null);
+ fields.add(field);
+ } else if (BasicType.SHORT_TYPE.equals(fieldTypes[index]) || BasicType.INT_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.INT), null, null);
+ fields.add(field);
+ } else if (BasicType.LONG_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.LONG), null, null);
+ fields.add(field);
+ } else if (BasicType.FLOAT_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.FLOAT), null, null);
+ fields.add(field);
+ } else if (BasicType.DOUBLE_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.DOUBLE), null, null);
+ fields.add(field);
+ } else if (BasicType.STRING_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.STRING), null, null);
+ fields.add(field);
+ } else if (BasicType.BYTE_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BYTES), null, null);
+ fields.add(field);
+ } else if (BasicType.VOID_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.NULL), null, null);
+ fields.add(field);
+ }
+ });
+ return Schema.createRecord("SeatunnelRecord",
+ "The record generated by seatunnel file connector",
+ "org.apache.parquet.avro",
+ false,
+ fields);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
new file mode 100644
index 000000000..5cb6db36b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsTransactionStateFileWriteFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+public class HdfsTransactionStateFileWriteFactory {
+
+ private HdfsTransactionStateFileWriteFactory() {}
+
+ public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+ @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer> sinkColumnsIndexInRow,
+ @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull String fieldDelimiter,
+ @NonNull String rowDelimiter,
+ @NonNull FileSystem fileSystem) {
+ FileSinkTransactionFileNameGenerator fileSinkTransactionFileNameGenerator = (FileSinkTransactionFileNameGenerator) transactionFileNameGenerator;
+ FileFormat fileFormat = fileSinkTransactionFileNameGenerator.getFileFormat();
+ if (fileFormat.equals(FileFormat.CSV)) {
+ // #2133 wait this issue closed, there will be replaced using csv writer
+ return new HdfsTxtTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fieldDelimiter,
+ rowDelimiter,
+ fileSystem);
+ }
+ if (fileFormat.equals(FileFormat.PARQUET)) {
+ return new HdfsParquetTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fileSystem);
+ }
+ // if file type not supported by file connector, default txt writer will be generated
+ return new HdfsTxtTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fieldDelimiter,
+ rowDelimiter,
+ fileSystem);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsUtils.java
index 421c7f7eb..97a1429c3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsUtils.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/HdfsUtils.java
@@ -37,14 +37,19 @@ public class HdfsUtils {
public static final int WRITE_BUFFER_SIZE = 2048;
- public static FileSystem getHdfsFs(@NonNull String path)
- throws IOException {
- Configuration conf = new Configuration();
+ public static final Configuration CONF = new Configuration();
+
+ // make the configuration object static, so orc and parquet reader can get it
+ static {
LOGGER.info(System.getenv("HADOOP_CONF_DIR"));
- conf.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/core-site.xml"));
- conf.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/hdfs-site.xml"));
- conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- return FileSystem.get(URI.create(path), conf);
+ CONF.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/core-site.xml"));
+ CONF.addResource(new Path(System.getenv("HADOOP_CONF_DIR") + "/hdfs-site.xml"));
+ CONF.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ }
+
+ public static FileSystem getHdfsFs(@NonNull String path)
+ throws IOException {
+ return FileSystem.get(URI.create(path), CONF);
}
public static FSDataOutputStream getOutputStream(@NonNull String outFilePath) throws IOException {
@@ -99,9 +104,7 @@ public class HdfsUtils {
}
}
- public static void createDir(@NonNull String filePath)
- throws IOException {
-
+ public static void createDir(@NonNull String filePath) throws IOException {
FileSystem hdfsFs = getHdfsFs(filePath);
Path dfs = new Path(filePath);
if (!hdfsFs.mkdirs(dfs)) {
@@ -109,8 +112,7 @@ public class HdfsUtils {
}
}
- public static boolean fileExist(@NonNull String filePath)
- throws IOException {
+ public static boolean fileExist(@NonNull String filePath) throws IOException {
FileSystem hdfsFs = getHdfsFs(filePath);
Path fileName = new Path(filePath);
return hdfsFs.exists(fileName);
@@ -119,8 +121,7 @@ public class HdfsUtils {
/**
* get the dir in filePath
*/
- public static List<Path> dirList(@NonNull String filePath)
- throws FileNotFoundException, IOException {
+ public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
FileSystem hdfsFs = getHdfsFs(filePath);
List<Path> pathList = new ArrayList<Path>();
Path fileName = new Path(filePath);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
index 1ac5bb77b..90797614c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
@@ -35,6 +35,12 @@
<artifactId>connector-file-base</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
+ <version>${flink-shaded-hadoop-2.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalFileSinkPlugin.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalFileSinkPlugin.java
index 1d4bc43e5..a50cfe263 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalFileSinkPlugin.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalFileSinkPlugin.java
@@ -49,17 +49,9 @@ public class LocalFileSinkPlugin implements SinkFileSystemPlugin {
@NonNull String fieldDelimiter,
@NonNull String rowDelimiter,
@NonNull FileSystem fileSystem) {
- return Optional.of(new LocalTxtTransactionStateFileWriter(seaTunnelRowTypeInfo,
- transactionFileNameGenerator,
- partitionDirNameGenerator,
- sinkColumnsIndexInRow,
- tmpPath,
- targetPath,
- jobId,
- subTaskIndex,
- fieldDelimiter,
- rowDelimiter,
- fileSystem));
+ // using factory to generate transaction state file writer
+ TransactionStateFileWriter writer = LocalTransactionStateFileWriteFactory.of(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fieldDelimiter, rowDelimiter, fileSystem);
+ return Optional.of(writer);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalParquetTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalParquetTransactionStateFileWriter.java
new file mode 100644
index 000000000..ec5833e4a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalParquetTransactionStateFileWriter.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.local;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LocalParquetTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LocalParquetTransactionStateFileWriter.class);
+ private final Configuration configuration = new Configuration();
+ private Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
+
+ public LocalParquetTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+ @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer> sinkColumnsIndexInRow, @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull FileSystem fileSystem) {
+ super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+ beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
+ Schema schema = buildSchemaWithRowType();
+ GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
+ sinkColumnsIndexInRow.forEach(index -> {
+ if (seaTunnelRowTypeInfo.getFieldType(index).equals(BasicType.STRING_TYPE)) {
+ recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index).toString());
+ } else {
+ recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index));
+ }
+ });
+ GenericData.Record record = recordBuilder.build();
+ try {
+ writer.write(record);
+ } catch (IOException e) {
+ String errorMsg = String.format("Write data to parquet file [%s] error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseWriteFile() {
+ this.beingWrittenWriter.forEach((k, v) -> {
+ try {
+ v.close();
+ } catch (IOException e) {
+ String errorMsg = String.format("Close file [%s] parquet writer failed, error msg: [%s]", k, e.getMessage());
+ throw new RuntimeException(errorMsg, e);
+ }
+ needMoveFiles.put(k, getTargetLocation(k));
+ });
+ }
+
+ @Override
+ public void beginTransaction(String transactionId) {
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void abortTransaction(String transactionId) {
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
+ ParquetWriter<GenericRecord> writer = this.beingWrittenWriter.get(filePath);
+ if (writer == null) {
+ Schema schema = buildSchemaWithRowType();
+ Path path = new Path(filePath);
+ try {
+ // In order to write file to local file system we should use empty configuration object
+ HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, configuration);
+ ParquetWriter<GenericRecord> newWriter = AvroParquetWriter.<GenericRecord>builder(outputFile)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ // use parquet v1 to improve compatibility
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+ // Temporarily use snappy compress
+ // I think we can use the compress option in config to control this
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withSchema(schema)
+ .build();
+ this.beingWrittenWriter.put(filePath, newWriter);
+ return newWriter;
+ } catch (IOException e) {
+ String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+ return writer;
+ }
+
+ private Schema buildSchemaWithRowType() {
+ ArrayList<Schema.Field> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowTypeInfo.getFieldTypes();
+ String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
+ sinkColumnsIndexInRow.forEach(index -> {
+ if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BOOLEAN), null, null);
+ fields.add(field);
+ } else if (BasicType.SHORT_TYPE.equals(fieldTypes[index]) || BasicType.INT_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.INT), null, null);
+ fields.add(field);
+ } else if (BasicType.LONG_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.LONG), null, null);
+ fields.add(field);
+ } else if (BasicType.FLOAT_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.FLOAT), null, null);
+ fields.add(field);
+ } else if (BasicType.DOUBLE_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.DOUBLE), null, null);
+ fields.add(field);
+ } else if (BasicType.STRING_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.STRING), null, null);
+ fields.add(field);
+ } else if (BasicType.BYTE_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BYTES), null, null);
+ fields.add(field);
+ } else if (BasicType.VOID_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.NULL), null, null);
+ fields.add(field);
+ }
+ });
+ return Schema.createRecord("SeatunnelRecord",
+ "The record generated by seatunnel file connector",
+ "org.apache.parquet.avro",
+ false,
+ fields);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
new file mode 100644
index 000000000..c4244bfc0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/LocalTransactionStateFileWriteFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.local;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+public class LocalTransactionStateFileWriteFactory {
+
+ private LocalTransactionStateFileWriteFactory() {}
+
+ public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+ @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer> sinkColumnsIndexInRow,
+ @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull String fieldDelimiter,
+ @NonNull String rowDelimiter,
+ @NonNull FileSystem fileSystem) {
+ FileSinkTransactionFileNameGenerator fileSinkTransactionFileNameGenerator = (FileSinkTransactionFileNameGenerator) transactionFileNameGenerator;
+ FileFormat fileFormat = fileSinkTransactionFileNameGenerator.getFileFormat();
+ if (fileFormat.equals(FileFormat.CSV)) {
+ // #2133 wait this issue closed, there will be replaced using csv writer
+ return new LocalTxtTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fieldDelimiter,
+ rowDelimiter,
+ fileSystem);
+ }
+ if (fileFormat.equals(FileFormat.PARQUET)) {
+ return new LocalParquetTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fileSystem);
+ }
+ // if file type not supported by file connector, default txt writer will be generated
+ return new LocalTxtTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fieldDelimiter,
+ rowDelimiter,
+ fileSystem);
+ }
+}