You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/18 02:32:08 UTC

[incubator-seatunnel] branch dev updated: [Bug][Connectors] Text And Json WriteStrategy lost the sinkColumnsIndexInRow (#3863)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b5f6f1bc [Bug][Connectors] Text And Json WriteStrategy lost the sinkColumnsIndexInRow (#3863)
7b5f6f1bc is described below

commit 7b5f6f1bc2f2038ff6ec25abdecc87f2d990c24f
Author: lvshaokang <lv...@hotmail.com>
AuthorDate: Wed Jan 18 10:32:02 2023 +0800

    [Bug][Connectors] Text And Json WriteStrategy lost the sinkColumnsIndexInRow (#3863)
    
    * [Bug][Connectors] Text And Json WriteStrategy lost the sinkColumnsIndexInRow
---
 .../seatunnel/file/sink/writer/AbstractWriteStrategy.java | 15 +++++++++++++++
 .../seatunnel/file/sink/writer/JsonWriteStrategy.java     |  7 +++++--
 .../seatunnel/file/sink/writer/TextWriteStrategy.java     |  6 ++++--
 3 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 329af511a..79f124e42 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -22,6 +22,7 @@ import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECOR
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.Constants;
@@ -117,6 +118,20 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
         currentBatchSize++;
     }
 
+    protected SeaTunnelRowType buildSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, List<Integer> sinkColumnsIndex) {
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        List<String> newFieldNames = new ArrayList<>();
+        List<SeaTunnelDataType<?>> newFieldTypes = new ArrayList<>();
+        sinkColumnsIndex.forEach(index -> {
+            newFieldNames.add(fieldNames[index]);
+            newFieldTypes.add(fieldTypes[index]);
+        });
+        return new SeaTunnelRowType(
+            newFieldNames.toArray(new String[0]),
+            newFieldTypes.toArray(new SeaTunnelDataType[0]));
+    }
+
     /**
      * use hadoop conf generate hadoop configuration
      *
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index 7d9b7ad0d..38b9b2296 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -50,7 +50,8 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
     @Override
     public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
-        this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
+        this.serializationSchema = new JsonSerializationSchema(
+            buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow));
     }
 
     @Override
@@ -59,7 +60,9 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
         FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath);
         try {
-            byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
+            byte[] rowBytes = serializationSchema.serialize(seaTunnelRow.copy(sinkColumnsIndexInRow.stream()
+                .mapToInt(Integer::intValue)
+                .toArray()));
             if (isFirstWrite.get(filePath)) {
                 isFirstWrite.put(filePath, false);
             } else {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 6698e2ee2..9b0c44a90 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -62,7 +62,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
     public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
         this.serializationSchema = TextSerializationSchema.builder()
-                .seaTunnelRowType(seaTunnelRowType)
+                .seaTunnelRowType(buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow))
                 .delimiter(fieldDelimiter)
                 .dateFormatter(dateFormat)
                 .dateTimeFormatter(dateTimeFormat)
@@ -81,7 +81,9 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
             } else {
                 fsDataOutputStream.write(rowDelimiter.getBytes());
             }
-            fsDataOutputStream.write(serializationSchema.serialize(seaTunnelRow));
+            fsDataOutputStream.write(serializationSchema.serialize(seaTunnelRow.copy(sinkColumnsIndexInRow.stream()
+                .mapToInt(Integer::intValue)
+                .toArray())));
         } catch (IOException e) {
             throw new FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
                     String.format("Write data to file [%s] failed", filePath), e);