You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/01 09:30:20 UTC

[incubator-seatunnel] branch dev updated: [Hotfix][Connector-V2][ES] Source deserializer error and inappropriate (#4233)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 15530d278 [Hotfix][Connector-V2][ES] Source deserializer error and inappropriate (#4233)
15530d278 is described below

commit 15530d2785ff98e8560951f59876104e7d6f321c
Author: wyc <wj...@outlook.com>
AuthorDate: Sat Apr 1 17:30:12 2023 +0800

    [Hotfix][Connector-V2][ES] Source deserializer error and inappropriate (#4233)
    
    * [Fix][Connector-V2][ES]source deserializer error and inappropriate
    
    * [WIP][Connector-V2][ES]try fix e2e
    
    * [WIP][Connector-V2][ES]try fix e2e
    
    * [WIP][Connector-V2][ES]try fix e2e
    
    * [WIP][Connector-V2][ES]try fix e2e
    
    * [Fix][Connector-V2][ES]es special types may result in a null pointer. e.g. ip type
    
    * [Fix][Connector-V2][ES]fix the parsing exception when the es time type is epoch_millis
---
 .../source/DefaultSeaTunnelRowDeserializer.java         | 17 ++++++++++++++++-
 .../e2e/connector/elasticsearch/ElasticsearchIT.java    | 14 ++++++++++----
 2 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index 0cd06dcaa..5a4858df9 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingExcep
 import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -37,7 +38,9 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.Elastic
 
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
+import java.time.Instant;
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Base64;
 import java.util.HashMap;
@@ -111,7 +114,12 @@ public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer
                 value = recursiveGet(rowRecord.getDoc(), fieldName);
                 if (value != null) {
                     seaTunnelDataType = rowTypeInfo.getFieldType(i);
-                    seaTunnelFields[i] = convertValue(seaTunnelDataType, value.toString());
+                    if (value instanceof TextNode) {
+                        seaTunnelFields[i] =
+                                convertValue(seaTunnelDataType, ((TextNode) value).textValue());
+                    } else {
+                        seaTunnelFields[i] = convertValue(seaTunnelDataType, value.toString());
+                    }
                 }
             }
         } catch (Exception ex) {
@@ -188,6 +196,13 @@ public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer
     }
 
     private LocalDateTime parseDate(String fieldValue) {
+        // handle strings of timestamp type
+        try {
+            long ts = Long.parseLong(fieldValue);
+            return LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault());
+        } catch (NumberFormatException e) {
+            // no op
+        }
         String formatDate = fieldValue.replace("T", " ");
         if (fieldValue.length() == "yyyyMMdd".length()
                 || fieldValue.length() == "yyyy-MM-dd".length()) {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index ec9e77847..793fa2513 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -44,10 +44,10 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.net.UnknownHostException;
 import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -131,8 +131,7 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
         Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
     }
 
-    private List<String> generateTestDataSet()
-            throws JsonProcessingException, UnknownHostException {
+    private List<String> generateTestDataSet() throws JsonProcessingException {
         String[] fields =
                 new String[] {
                     "c_map",
@@ -170,7 +169,7 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
                         BigDecimal.valueOf(11, 1),
                         "test".getBytes(),
                         LocalDate.now().toString(),
-                        LocalDateTime.now().toString()
+                        System.currentTimeMillis()
                     };
             for (int j = 0; j < fields.length; j++) {
                 doc.put(fields[j], values[j]);
@@ -215,6 +214,13 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
                             x.remove("_index");
                             x.remove("_type");
                             x.remove("_id");
+                            // I don’t know if converting the test cases in this way complies with
+                            // the CI specification
+                            x.replace(
+                                    "c_timestamp",
+                                    LocalDateTime.parse(x.get("c_timestamp").toString())
+                                            .toInstant(ZoneOffset.UTC)
+                                            .toEpochMilli());
                         });
         List<String> docs =
                 scrollResult.getDocs().stream()