You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/14 15:03:20 UTC

[incubator-seatunnel] branch dev updated: [hotfix][translation] fix data,time,timestamp type convert for spark (#2637)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3b5927584 [hotfix][translation] fix data,time,timestamp type convert for spark (#2637)
3b5927584 is described below

commit 3b5927584db5302bc512801d9f82f4451d5891ca
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Wed Sep 14 23:03:12 2022 +0800

    [hotfix][translation] fix data,time,timestamp type convert for spark (#2637)
    
    * [hotfix] data,time,timestamp convert for spark
    
    * fxi the instant convert
    
    * revert to nullable field value
    
    * Update exception info
    
    * Update exception info
    
    Co-authored-by: laglangyue <la...@qq.com>
    Co-authored-by: Zongwen Li <zo...@gmail.com>
---
 .../src/main/resources/examples/spark.batch.conf   |  3 +-
 .../common/serialization/InternalRowConverter.java | 21 ++++-----
 .../spark/common/utils/InstantConverterUtils.java  | 51 ++++++++++++++++++++++
 .../spark/common/utils/TypeConverterUtils.java     |  4 +-
 4 files changed, 65 insertions(+), 14 deletions(-)

diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 25ae6b567..00b2338c2 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -49,7 +49,6 @@ source {
         c_null = "null"
         c_bytes = bytes
         c_date = date
-        c_time = time
         c_timestamp = timestamp
       }
     }
@@ -72,7 +71,7 @@ transform {
 
   # you can also use other transform plugins, such as sql
   sql {
-    sql = "select c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes from fake"
+    sql = "select c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp from fake"
     result_table_name = "sql"
   }
 
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
index e95989ec8..31673c595 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.translation.serialization.RowConverter;
+import org.apache.seatunnel.translation.spark.common.utils.InstantConverterUtils;
 import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -43,11 +44,9 @@ import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
-import java.time.LocalTime;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.BiFunction;
@@ -74,13 +73,12 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
                 SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
                 return convert(seaTunnelRow, rowType);
             case DATE:
-                return Date.valueOf((LocalDate) field);
+                return (int) ((LocalDate) field).toEpochDay();
             case TIME:
-                //TODO: how reconvert?
-                LocalTime localTime = (LocalTime) field;
-                return Timestamp.valueOf(LocalDateTime.of(LocalDate.ofEpochDay(0), localTime));
+                // TODO: Support TIME Type
+                throw new RuntimeException("time type is not supported now, but will be supported in the future.");
             case TIMESTAMP:
-                return Timestamp.valueOf((LocalDateTime) field);
+                return InstantConverterUtils.toEpochMicro(Timestamp.valueOf((LocalDateTime) field).toInstant());
             case MAP:
                 return convertMap((Map<?, ?>) field, (MapType<?, ?>) dataType, InternalRowConverter::convert);
             case STRING:
@@ -141,8 +139,10 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
             case SMALLINT:
                 return new MutableShort();
             case INT:
+            case DATE:
                 return new MutableInt();
             case BIGINT:
+            case TIMESTAMP:
                 return new MutableLong();
             case FLOAT:
                 return new MutableFloat();
@@ -167,11 +167,12 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
             case ROW:
                 return reconvert((InternalRow) field, (SeaTunnelRowType) dataType);
             case DATE:
-                return ((Date) field).toLocalDate();
+                return LocalDate.ofEpochDay((int) field);
             case TIME:
-                return ((Timestamp) field).toLocalDateTime().toLocalTime();
+                // TODO: Support TIME Type
+                throw new RuntimeException("time type is not supported now, but will be supported in the future.");
             case TIMESTAMP:
-                return ((Timestamp) field).toLocalDateTime();
+                return Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field)).toLocalDateTime();
             case MAP:
                 return convertMap((Map<?, ?>) field, (MapType<?, ?>) dataType, InternalRowConverter::reconvert);
             case STRING:
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/InstantConverterUtils.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/InstantConverterUtils.java
new file mode 100644
index 000000000..37a90ee23
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/InstantConverterUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.common.utils;
+
+import java.time.Instant;
+
+public class InstantConverterUtils {
+
+    private static final int MICRO_OF_SECOND = 1000_000;
+    private static final int MICRO_OF_NANOS = 1000;
+
+    /**
+     * @see Instant#toEpochMilli()
+     */
+    public static Long toEpochMicro(Instant instant) {
+        long seconds = instant.getEpochSecond();
+        int nanos = instant.getNano();
+        if (seconds < 0 && nanos > 0) {
+            long micro = Math.multiplyExact(seconds + 1, MICRO_OF_SECOND);
+            long adjustment = nanos / MICRO_OF_NANOS - MICRO_OF_SECOND;
+            return Math.addExact(micro, adjustment);
+        } else {
+            long millis = Math.multiplyExact(seconds, MICRO_OF_SECOND);
+            return Math.addExact(millis, nanos / MICRO_OF_NANOS);
+        }
+    }
+
+    /**
+     * @see Instant#ofEpochMilli(long)
+     */
+    public static Instant ofEpochMicro(long epochMicro) {
+        long secs = Math.floorDiv(epochMicro, MICRO_OF_SECOND);
+        int mos = (int) Math.floorMod(epochMicro, MICRO_OF_SECOND);
+        return Instant.ofEpochSecond(secs, Math.multiplyExact(mos, MICRO_OF_NANOS));
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/TypeConverterUtils.java
index 36260be05..493fd9a9c 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/TypeConverterUtils.java
@@ -85,8 +85,8 @@ public class TypeConverterUtils {
                 return DataTypes.BinaryType;
             case DATE:
                 return DataTypes.DateType;
-            case TIME:
-                // TODO: how reconvert?
+            // case TIME:
+                // TODO: not support now, how reconvert?
             case TIMESTAMP:
                 return DataTypes.TimestampType;
             case ARRAY: