You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/05 12:51:26 UTC

[incubator-seatunnel] branch dev updated: [Improve][format][text] Support read & write SeaTunnelRow type (#2969)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 50a725490 [Improve][format][text] Support read & write SeaTunnelRow type (#2969)
50a725490 is described below

commit 50a725490255f9bac96aa1cadd7f90d7302f1464
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Oct 5 20:51:21 2022 +0800

    [Improve][format][text] Support read & write SeaTunnelRow type (#2969)
---
 .../seatunnel/common/utils/DateTimeUtils.java      | 11 ++++++
 .../apache/seatunnel/common/utils/DateUtils.java   | 11 ++++++
 .../apache/seatunnel/common/utils/TimeUtils.java   | 11 ++++++
 .../format/text/TextDeserializationSchema.java     | 46 ++++++++++++++++++----
 .../format/text/TextSerializationSchema.java       | 15 +++++--
 5 files changed, 83 insertions(+), 11 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
index a23f174c0..ff69533f3 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
@@ -56,5 +56,16 @@ public class DateTimeUtils {
         public String getValue() {
             return value;
         }
+
+        public static Formatter parse(String format) {
+            Formatter[] formatters = Formatter.values();
+            for (Formatter formatter : formatters) {
+                if (formatter.getValue().equals(format)) {
+                    return formatter;
+                }
+            }
+            String errorMsg = String.format("Illegal format [%s]", format);
+            throw new IllegalArgumentException(errorMsg);
+        }
     }
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java
index 7a67055ce..ed67f74d3 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java
@@ -52,5 +52,16 @@ public class DateUtils {
         public String getValue() {
             return value;
         }
+
+        public static Formatter parse(String format) {
+            Formatter[] formatters = Formatter.values();
+            for (Formatter formatter : formatters) {
+                if (formatter.getValue().equals(format)) {
+                    return formatter;
+                }
+            }
+            String errorMsg = String.format("Illegal format [%s]", format);
+            throw new IllegalArgumentException(errorMsg);
+        }
     }
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java
index 8d0e1584b..76ee5c5e4 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java
@@ -50,5 +50,16 @@ public class TimeUtils {
         public String getValue() {
             return value;
         }
+
+        public static Formatter parse(String format) {
+            Formatter[] formatters = Formatter.values();
+            for (Formatter formatter : formatters) {
+                if (formatter.getValue().equals(format)) {
+                    return formatter;
+                }
+            }
+            String errorMsg = String.format("Illegal format [%s]", format);
+            throw new IllegalArgumentException(errorMsg);
+        }
     }
 }
diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index ea497ff47..ab3576eb9 100644
--- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.JsonUtils;
@@ -32,10 +33,12 @@ import org.apache.seatunnel.common.utils.TimeUtils;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import lombok.Builder;
 import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -55,13 +58,10 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
     @Override
     public SeaTunnelRow deserialize(byte[] message) throws IOException {
         String content = new String(message);
-        String[] splits = content.split(delimiter);
-        if (seaTunnelRowType.getTotalFields() != splits.length) {
-            throw new IndexOutOfBoundsException("The data does not match the configured schema information, please check");
-        }
-        Object[] objects = new Object[splits.length];
-        for (int i = 0; i < splits.length; i++) {
-            objects[i] = convert(splits[i], seaTunnelRowType.getFieldType(i));
+        Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, seaTunnelRowType);
+        Object[] objects = new Object[splitsMap.size()];
+        for (int i = 0; i < objects.length; i++) {
+            objects[i] = convert(splitsMap.get(i), seaTunnelRowType.getFieldType(i));
         }
         return new SeaTunnelRow(objects);
     }
@@ -71,7 +71,30 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
         return seaTunnelRowType;
     }
 
+    private Map<Integer, String> splitLineBySeaTunnelRowType(String line, SeaTunnelRowType seaTunnelRowType) {
+        String[] splits = line.split(delimiter, -1);
+        LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        int cursor = 0;
+        for (int i = 0; i < fieldTypes.length; i++) {
+            if (fieldTypes[i].getSqlType() == SqlType.ROW) {
+                // row type
+                int totalFields = ((SeaTunnelRowType) fieldTypes[i]).getTotalFields();
+                ArrayList<String> rowSplits = new ArrayList<>(Arrays.asList(splits).subList(cursor, cursor + totalFields));
+                splitsMap.put(i, String.join(delimiter, rowSplits));
+                cursor += totalFields;
+            } else {
+                // not row type
+                splitsMap.put(i, splits[cursor++]);
+            }
+        }
+        return splitsMap;
+    }
+
     private Object convert(String field, SeaTunnelDataType<?> fieldType) {
+        if (StringUtils.isBlank(field)) {
+            return null;
+        }
         switch (fieldType.getSqlType()) {
             case ARRAY:
                 BasicType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
@@ -134,8 +157,15 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
                 return TimeUtils.parse(field, timeFormatter);
             case TIMESTAMP:
                 return DateTimeUtils.parse(field, dateTimeFormatter);
+            case ROW:
+                Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) fieldType);
+                Object[] objects = new Object[splitsMap.size()];
+                for (int i = 0; i < objects.length; i++) {
+                    objects[i] = convert(splitsMap.get(i), ((SeaTunnelRowType) fieldType).getFieldType(i));
+                }
+                return new SeaTunnelRow(objects);
             default:
-                throw new UnsupportedOperationException("SeaTunnel format text not supported for parsing [SeaTunnelRow] type");
+                throw new UnsupportedOperationException("SeaTunnel format text not supported for parsing this type");
         }
     }
 }
diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
index 5a9953252..d23d5c2cc 100644
--- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.common.utils.TimeUtils;
 
 import lombok.Builder;
 import lombok.NonNull;
-import org.apache.commons.lang3.StringUtils;
 
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -57,10 +56,13 @@ public class TextSerializationSchema implements SerializationSchema {
         for (int i = 0; i < fields.length; i++) {
             strings[i] = convert(fields[i], seaTunnelRowType.getFieldType(i));
         }
-        return StringUtils.join(strings, delimiter).getBytes();
+        return String.join(delimiter, strings).getBytes();
     }
 
     private String convert(Object field, SeaTunnelDataType<?> fieldType) {
+        if (field == null) {
+            return "";
+        }
         switch (fieldType.getSqlType()) {
             case ARRAY:
             case MAP:
@@ -85,8 +87,15 @@ public class TextSerializationSchema implements SerializationSchema {
                 return "";
             case BYTES:
                 return new String((byte[]) field);
+            case ROW:
+                Object[] fields = ((SeaTunnelRow) field).getFields();
+                String[] strings = new String[fields.length];
+                for (int i = 0; i < fields.length; i++) {
+                    strings[i] = convert(fields[i], ((SeaTunnelRowType) fieldType).getFieldType(i));
+                }
+                return String.join(delimiter, strings);
             default:
-                throw new UnsupportedOperationException("SeaTunnel format text not supported for parsing [SeaTunnelRow] type");
+                throw new UnsupportedOperationException("SeaTunnel format text not supported for parsing this type");
         }
     }
 }