You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/18 02:32:08 UTC
[incubator-seatunnel] branch dev updated: [Bug][Connectors] Text And Json WriteStrategy lost the sinkColumnsIndexInRow (#3863)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7b5f6f1bc [Bug][Connectors] Text And Json WriteStrategy lost the sinkColumnsIndexInRow (#3863)
7b5f6f1bc is described below
commit 7b5f6f1bc2f2038ff6ec25abdecc87f2d990c24f
Author: lvshaokang <lv...@hotmail.com>
AuthorDate: Wed Jan 18 10:32:02 2023 +0800
[Bug][Connectors] Text And Json WriteStrategy lost the sinkColumnsIndexInRow (#3863)
* [Bug][Connectors] Text And Json WriteStrategy lost the sinkColumnsIndexInRow
---
.../seatunnel/file/sink/writer/AbstractWriteStrategy.java | 15 +++++++++++++++
.../seatunnel/file/sink/writer/JsonWriteStrategy.java | 7 +++++--
.../seatunnel/file/sink/writer/TextWriteStrategy.java | 6 ++++--
3 files changed, 24 insertions(+), 4 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 329af511a..79f124e42 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -22,6 +22,7 @@ import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECOR
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.Constants;
@@ -117,6 +118,20 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
currentBatchSize++;
}
+ protected SeaTunnelRowType buildSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, List<Integer> sinkColumnsIndex) {
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ List<String> newFieldNames = new ArrayList<>();
+ List<SeaTunnelDataType<?>> newFieldTypes = new ArrayList<>();
+ sinkColumnsIndex.forEach(index -> {
+ newFieldNames.add(fieldNames[index]);
+ newFieldTypes.add(fieldTypes[index]);
+ });
+ return new SeaTunnelRowType(
+ newFieldNames.toArray(new String[0]),
+ newFieldTypes.toArray(new SeaTunnelDataType[0]));
+ }
+
/**
* use hadoop conf generate hadoop configuration
*
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index 7d9b7ad0d..38b9b2296 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -50,7 +50,8 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
- this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
+ this.serializationSchema = new JsonSerializationSchema(
+ buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow));
}
@Override
@@ -59,7 +60,9 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
try {
- byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
+ byte[] rowBytes = serializationSchema.serialize(seaTunnelRow.copy(sinkColumnsIndexInRow.stream()
+ .mapToInt(Integer::intValue)
+ .toArray()));
if (isFirstWrite.get(filePath)) {
isFirstWrite.put(filePath, false);
} else {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 6698e2ee2..9b0c44a90 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -62,7 +62,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
this.serializationSchema = TextSerializationSchema.builder()
- .seaTunnelRowType(seaTunnelRowType)
+ .seaTunnelRowType(buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow))
.delimiter(fieldDelimiter)
.dateFormatter(dateFormat)
.dateTimeFormatter(dateTimeFormat)
@@ -81,7 +81,9 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
} else {
fsDataOutputStream.write(rowDelimiter.getBytes());
}
- fsDataOutputStream.write(serializationSchema.serialize(seaTunnelRow));
+ fsDataOutputStream.write(serializationSchema.serialize(seaTunnelRow.copy(sinkColumnsIndexInRow.stream()
+ .mapToInt(Integer::intValue)
+ .toArray())));
} catch (IOException e) {
throw new FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
String.format("Write data to file [%s] failed", filePath), e);