You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/05 12:51:26 UTC
[incubator-seatunnel] branch dev updated: [Improve][format][text] Support read & write SeaTunnelRow type (#2969)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 50a725490 [Improve][format][text] Support read & write SeaTunnelRow type (#2969)
50a725490 is described below
commit 50a725490255f9bac96aa1cadd7f90d7302f1464
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Oct 5 20:51:21 2022 +0800
[Improve][format][text] Support read & write SeaTunnelRow type (#2969)
---
.../seatunnel/common/utils/DateTimeUtils.java | 11 ++++++
.../apache/seatunnel/common/utils/DateUtils.java | 11 ++++++
.../apache/seatunnel/common/utils/TimeUtils.java | 11 ++++++
.../format/text/TextDeserializationSchema.java | 46 ++++++++++++++++++----
.../format/text/TextSerializationSchema.java | 15 +++++--
5 files changed, 83 insertions(+), 11 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
index a23f174c0..ff69533f3 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
@@ -56,5 +56,16 @@ public class DateTimeUtils {
public String getValue() {
return value;
}
+
+ public static Formatter parse(String format) {
+ Formatter[] formatters = Formatter.values();
+ for (Formatter formatter : formatters) {
+ if (formatter.getValue().equals(format)) {
+ return formatter;
+ }
+ }
+ String errorMsg = String.format("Illegal format [%s]", format);
+ throw new IllegalArgumentException(errorMsg);
+ }
}
}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java
index 7a67055ce..ed67f74d3 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java
@@ -52,5 +52,16 @@ public class DateUtils {
public String getValue() {
return value;
}
+
+ public static Formatter parse(String format) {
+ Formatter[] formatters = Formatter.values();
+ for (Formatter formatter : formatters) {
+ if (formatter.getValue().equals(format)) {
+ return formatter;
+ }
+ }
+ String errorMsg = String.format("Illegal format [%s]", format);
+ throw new IllegalArgumentException(errorMsg);
+ }
}
}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java
index 8d0e1584b..76ee5c5e4 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java
@@ -50,5 +50,16 @@ public class TimeUtils {
public String getValue() {
return value;
}
+
+ public static Formatter parse(String format) {
+ Formatter[] formatters = Formatter.values();
+ for (Formatter formatter : formatters) {
+ if (formatter.getValue().equals(format)) {
+ return formatter;
+ }
+ }
+ String errorMsg = String.format("Illegal format [%s]", format);
+ throw new IllegalArgumentException(errorMsg);
+ }
}
}
diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index ea497ff47..ab3576eb9 100644
--- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
@@ -32,10 +33,12 @@ import org.apache.seatunnel.common.utils.TimeUtils;
import com.fasterxml.jackson.databind.node.ArrayNode;
import lombok.Builder;
import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -55,13 +58,10 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
String content = new String(message);
- String[] splits = content.split(delimiter);
- if (seaTunnelRowType.getTotalFields() != splits.length) {
- throw new IndexOutOfBoundsException("The data does not match the configured schema information, please check");
- }
- Object[] objects = new Object[splits.length];
- for (int i = 0; i < splits.length; i++) {
- objects[i] = convert(splits[i], seaTunnelRowType.getFieldType(i));
+ Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, seaTunnelRowType);
+ Object[] objects = new Object[splitsMap.size()];
+ for (int i = 0; i < objects.length; i++) {
+ objects[i] = convert(splitsMap.get(i), seaTunnelRowType.getFieldType(i));
}
return new SeaTunnelRow(objects);
}
@@ -71,7 +71,30 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
return seaTunnelRowType;
}
+ private Map<Integer, String> splitLineBySeaTunnelRowType(String line, SeaTunnelRowType seaTunnelRowType) {
+ String[] splits = line.split(delimiter, -1);
+ LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ int cursor = 0;
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (fieldTypes[i].getSqlType() == SqlType.ROW) {
+ // row type
+ int totalFields = ((SeaTunnelRowType) fieldTypes[i]).getTotalFields();
+ ArrayList<String> rowSplits = new ArrayList<>(Arrays.asList(splits).subList(cursor, cursor + totalFields));
+ splitsMap.put(i, String.join(delimiter, rowSplits));
+ cursor += totalFields;
+ } else {
+ // not row type
+ splitsMap.put(i, splits[cursor++]);
+ }
+ }
+ return splitsMap;
+ }
+
private Object convert(String field, SeaTunnelDataType<?> fieldType) {
+ if (StringUtils.isBlank(field)) {
+ return null;
+ }
switch (fieldType.getSqlType()) {
case ARRAY:
BasicType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
@@ -134,8 +157,15 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne
return TimeUtils.parse(field, timeFormatter);
case TIMESTAMP:
return DateTimeUtils.parse(field, dateTimeFormatter);
+ case ROW:
+ Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) fieldType);
+ Object[] objects = new Object[splitsMap.size()];
+ for (int i = 0; i < objects.length; i++) {
+ objects[i] = convert(splitsMap.get(i), ((SeaTunnelRowType) fieldType).getFieldType(i));
+ }
+ return new SeaTunnelRow(objects);
default:
- throw new UnsupportedOperationException("SeaTunnel format text not supported for parsing [SeaTunnelRow] type");
+ throw new UnsupportedOperationException("SeaTunnel format text not supported for parsing this type");
}
}
}
diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
index 5a9953252..d23d5c2cc 100644
--- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.common.utils.TimeUtils;
import lombok.Builder;
import lombok.NonNull;
-import org.apache.commons.lang3.StringUtils;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -57,10 +56,13 @@ public class TextSerializationSchema implements SerializationSchema {
for (int i = 0; i < fields.length; i++) {
strings[i] = convert(fields[i], seaTunnelRowType.getFieldType(i));
}
- return StringUtils.join(strings, delimiter).getBytes();
+ return String.join(delimiter, strings).getBytes();
}
private String convert(Object field, SeaTunnelDataType<?> fieldType) {
+ if (field == null) {
+ return "";
+ }
switch (fieldType.getSqlType()) {
case ARRAY:
case MAP:
@@ -85,8 +87,15 @@ public class TextSerializationSchema implements SerializationSchema {
return "";
case BYTES:
return new String((byte[]) field);
+ case ROW:
+ Object[] fields = ((SeaTunnelRow) field).getFields();
+ String[] strings = new String[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ strings[i] = convert(fields[i], ((SeaTunnelRowType) fieldType).getFieldType(i));
+ }
+ return String.join(delimiter, strings);
default:
- throw new UnsupportedOperationException("SeaTunnel format text not supported for parsing [SeaTunnelRow] type");
+ throw new UnsupportedOperationException("SeaTunnel format text not supported for parsing this type");
}
}
}