You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/01 09:30:20 UTC
[incubator-seatunnel] branch dev updated: [Hotfix][Connector-V2][ES] Source deserializer error and inappropriate (#4233)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 15530d278 [Hotfix][Connector-V2][ES] Source deserializer error and inappropriate (#4233)
15530d278 is described below
commit 15530d2785ff98e8560951f59876104e7d6f321c
Author: wyc <wj...@outlook.com>
AuthorDate: Sat Apr 1 17:30:12 2023 +0800
[Hotfix][Connector-V2][ES] Source deserializer error and inappropriate (#4233)
* [Fix][Connector-V2][ES]source deserializer error and inappropriate
* [WIP][Connector-V2][ES]try fix e2e
* [WIP][Connector-V2][ES]try fix e2e
* [WIP][Connector-V2][ES]try fix e2e
* [WIP][Connector-V2][ES]try fix e2e
* [Fix][Connector-V2][ES]es special types may result in a null pointer. e.g. ip type
* [Fix][Connector-V2][ES]fix the parsing exception when the es time type is epoch_millis
---
.../source/DefaultSeaTunnelRowDeserializer.java | 17 ++++++++++++++++-
.../e2e/connector/elasticsearch/ElasticsearchIT.java | 14 ++++++++++----
2 files changed, 26 insertions(+), 5 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index 0cd06dcaa..5a4858df9 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingExcep
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -37,7 +38,9 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.Elastic
import java.lang.reflect.Array;
import java.math.BigDecimal;
+import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.HashMap;
@@ -111,7 +114,12 @@ public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer
value = recursiveGet(rowRecord.getDoc(), fieldName);
if (value != null) {
seaTunnelDataType = rowTypeInfo.getFieldType(i);
- seaTunnelFields[i] = convertValue(seaTunnelDataType, value.toString());
+ if (value instanceof TextNode) {
+ seaTunnelFields[i] =
+ convertValue(seaTunnelDataType, ((TextNode) value).textValue());
+ } else {
+ seaTunnelFields[i] = convertValue(seaTunnelDataType, value.toString());
+ }
}
}
} catch (Exception ex) {
@@ -188,6 +196,13 @@ public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer
}
private LocalDateTime parseDate(String fieldValue) {
+ // handle strings of timestamp type
+ try {
+ long ts = Long.parseLong(fieldValue);
+ return LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault());
+ } catch (NumberFormatException e) {
+ // no op
+ }
String formatDate = fieldValue.replace("T", " ");
if (fieldValue.length() == "yyyyMMdd".length()
|| fieldValue.length() == "yyyy-MM-dd".length()) {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index ec9e77847..793fa2513 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -44,10 +44,10 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.math.BigDecimal;
-import java.net.UnknownHostException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -131,8 +131,7 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
- private List<String> generateTestDataSet()
- throws JsonProcessingException, UnknownHostException {
+ private List<String> generateTestDataSet() throws JsonProcessingException {
String[] fields =
new String[] {
"c_map",
@@ -170,7 +169,7 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
BigDecimal.valueOf(11, 1),
"test".getBytes(),
LocalDate.now().toString(),
- LocalDateTime.now().toString()
+ System.currentTimeMillis()
};
for (int j = 0; j < fields.length; j++) {
doc.put(fields[j], values[j]);
@@ -215,6 +214,13 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
x.remove("_index");
x.remove("_type");
x.remove("_id");
+ // I don’t know if converting the test cases in this way complies with
+ // the CI specification
+ x.replace(
+ "c_timestamp",
+ LocalDateTime.parse(x.get("c_timestamp").toString())
+ .toInstant(ZoneOffset.UTC)
+ .toEpochMilli());
});
List<String> docs =
scrollResult.getDocs().stream()