You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/14 15:03:20 UTC
[incubator-seatunnel] branch dev updated: [hotfix][translation] fix data,time,timestamp type convert for spark (#2637)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3b5927584 [hotfix][translation] fix data,time,timestamp type convert for spark (#2637)
3b5927584 is described below
commit 3b5927584db5302bc512801d9f82f4451d5891ca
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Wed Sep 14 23:03:12 2022 +0800
[hotfix][translation] fix data,time,timestamp type convert for spark (#2637)
* [hotfix] data,time,timestamp convert for spark
* fxi the instant convert
* revert to nullable field value
* Update exception info
* Update exception info
Co-authored-by: laglangyue <la...@qq.com>
Co-authored-by: Zongwen Li <zo...@gmail.com>
---
.../src/main/resources/examples/spark.batch.conf | 3 +-
.../common/serialization/InternalRowConverter.java | 21 ++++-----
.../spark/common/utils/InstantConverterUtils.java | 51 ++++++++++++++++++++++
.../spark/common/utils/TypeConverterUtils.java | 4 +-
4 files changed, 65 insertions(+), 14 deletions(-)
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 25ae6b567..00b2338c2 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -49,7 +49,6 @@ source {
c_null = "null"
c_bytes = bytes
c_date = date
- c_time = time
c_timestamp = timestamp
}
}
@@ -72,7 +71,7 @@ transform {
# you can also use other transform plugins, such as sql
sql {
- sql = "select c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes from fake"
+ sql = "select c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp from fake"
result_table_name = "sql"
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
index e95989ec8..31673c595 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.translation.serialization.RowConverter;
+import org.apache.seatunnel.translation.spark.common.utils.InstantConverterUtils;
import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -43,11 +44,9 @@ import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
import java.math.BigDecimal;
-import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
-import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
@@ -74,13 +73,12 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
return convert(seaTunnelRow, rowType);
case DATE:
- return Date.valueOf((LocalDate) field);
+ return (int) ((LocalDate) field).toEpochDay();
case TIME:
- //TODO: how reconvert?
- LocalTime localTime = (LocalTime) field;
- return Timestamp.valueOf(LocalDateTime.of(LocalDate.ofEpochDay(0), localTime));
+ // TODO: Support TIME Type
+ throw new RuntimeException("time type is not supported now, but will be supported in the future.");
case TIMESTAMP:
- return Timestamp.valueOf((LocalDateTime) field);
+ return InstantConverterUtils.toEpochMicro(Timestamp.valueOf((LocalDateTime) field).toInstant());
case MAP:
return convertMap((Map<?, ?>) field, (MapType<?, ?>) dataType, InternalRowConverter::convert);
case STRING:
@@ -141,8 +139,10 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
case SMALLINT:
return new MutableShort();
case INT:
+ case DATE:
return new MutableInt();
case BIGINT:
+ case TIMESTAMP:
return new MutableLong();
case FLOAT:
return new MutableFloat();
@@ -167,11 +167,12 @@ public final class InternalRowConverter extends RowConverter<InternalRow> {
case ROW:
return reconvert((InternalRow) field, (SeaTunnelRowType) dataType);
case DATE:
- return ((Date) field).toLocalDate();
+ return LocalDate.ofEpochDay((int) field);
case TIME:
- return ((Timestamp) field).toLocalDateTime().toLocalTime();
+ // TODO: Support TIME Type
+ throw new RuntimeException("time type is not supported now, but will be supported in the future.");
case TIMESTAMP:
- return ((Timestamp) field).toLocalDateTime();
+ return Timestamp.from(InstantConverterUtils.ofEpochMicro((long) field)).toLocalDateTime();
case MAP:
return convertMap((Map<?, ?>) field, (MapType<?, ?>) dataType, InternalRowConverter::reconvert);
case STRING:
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/InstantConverterUtils.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/InstantConverterUtils.java
new file mode 100644
index 000000000..37a90ee23
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/InstantConverterUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.common.utils;
+
+import java.time.Instant;
+
+public class InstantConverterUtils {
+
+ private static final int MICRO_OF_SECOND = 1000_000;
+ private static final int MICRO_OF_NANOS = 1000;
+
+ /**
+ * @see Instant#toEpochMilli()
+ */
+ public static Long toEpochMicro(Instant instant) {
+ long seconds = instant.getEpochSecond();
+ int nanos = instant.getNano();
+ if (seconds < 0 && nanos > 0) {
+ long micro = Math.multiplyExact(seconds + 1, MICRO_OF_SECOND);
+ long adjustment = nanos / MICRO_OF_NANOS - MICRO_OF_SECOND;
+ return Math.addExact(micro, adjustment);
+ } else {
+ long millis = Math.multiplyExact(seconds, MICRO_OF_SECOND);
+ return Math.addExact(millis, nanos / MICRO_OF_NANOS);
+ }
+ }
+
+ /**
+ * @see Instant#ofEpochMilli(long)
+ */
+ public static Instant ofEpochMicro(long epochMicro) {
+ long secs = Math.floorDiv(epochMicro, MICRO_OF_SECOND);
+ int mos = (int) Math.floorMod(epochMicro, MICRO_OF_SECOND);
+ return Instant.ofEpochSecond(secs, Math.multiplyExact(mos, MICRO_OF_NANOS));
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/TypeConverterUtils.java
index 36260be05..493fd9a9c 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/utils/TypeConverterUtils.java
@@ -85,8 +85,8 @@ public class TypeConverterUtils {
return DataTypes.BinaryType;
case DATE:
return DataTypes.DateType;
- case TIME:
- // TODO: how reconvert?
+ // case TIME:
+ // TODO: not support now, how reconvert?
case TIMESTAMP:
return DataTypes.TimestampType;
case ARRAY: