You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2022/06/12 13:56:25 UTC

[incubator-doris-flink-connector] branch master updated: [Fix] fix flink date and timestamp type not mapping

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aa8d4b  [Fix] fix flink date and timestamp type not mapping
7aa8d4b is described below

commit 7aa8d4bce84eb68a4bf2f842f24fdd6c897ff910
Author: gj-zhang <ra...@163.com>
AuthorDate: Sun Jun 12 21:56:19 2022 +0800

    [Fix] fix flink date and timestamp type not mapping
    
    * [FIX] fix flink date and timestamp type not mapping.
    * The type conversion here is recommended to be abstracted and placed in DorisRowConverter.
    * overload the constructor for the DorisRowConverter.
    add unit test for external convert
    * add license header.
---
 .../RowDataDeserializationSchema.java              |   2 +-
 .../converter/DorisRowConverter.java               | 133 ++++++++++++++++++++-
 .../apache/doris/flink/serialization/RowBatch.java |  34 +++++-
 .../doris/flink/sink/writer/RowDataSerializer.java |  15 +--
 .../doris/flink/table/DorisRowDataInputFormat.java |   2 +-
 .../doris/flink/DorisDateAndTimestampSqlTest.java  |  66 ++++++++++
 .../convert/DorisRowConverterTest.java             |  64 +++++++++-
 .../doris/flink/serialization/TestRowBatch.java    |  14 ++-
 8 files changed, 303 insertions(+), 27 deletions(-)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
index f9c65b3..3342b66 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
@@ -44,7 +44,7 @@ public class RowDataDeserializationSchema implements DorisDeserializationSchema<
 
     @Override
     public void deserialize(List<?> record, Collector<RowData> out) throws Exception {
-        RowData row = rowConverter.convert(record);
+        RowData row = rowConverter.convertInternal(record);
         out.collect(row);
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index ee7a9a7..d1bb529 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -21,12 +21,20 @@ import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -35,12 +43,26 @@ public class DorisRowConverter implements Serializable {
 
     private static final long serialVersionUID = 1L;
     private final DeserializationConverter[] deserializationConverters;
+    private final SerializationConverter[] serializationConverters;
 
     public DorisRowConverter(RowType rowType) {
         checkNotNull(rowType);
         this.deserializationConverters = new DeserializationConverter[rowType.getFieldCount()];
+        this.serializationConverters = new SerializationConverter[rowType.getFieldCount()];
         for (int i = 0; i < rowType.getFieldCount(); i++) {
-            deserializationConverters[i] = createNullableConverter(rowType.getTypeAt(i));
+            deserializationConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i));
+            serializationConverters[i] = createNullableExternalConverter(rowType.getTypeAt(i));
+        }
+    }
+
+    public DorisRowConverter(DataType[] dataTypes) {
+        checkNotNull(dataTypes);
+        this.deserializationConverters = new DeserializationConverter[dataTypes.length];
+        this.serializationConverters = new SerializationConverter[dataTypes.length];
+        for (int i = 0; i < dataTypes.length; i++) {
+            LogicalType logicalType = dataTypes[i].getLogicalType();
+            deserializationConverters[i] = createNullableInternalConverter(logicalType);
+            serializationConverters[i] = createNullableExternalConverter(logicalType);
         }
     }
 
@@ -49,7 +71,7 @@ public class DorisRowConverter implements Serializable {
      *
      * @param record from rowBatch
      */
-    public GenericRowData convert(List record){
+    public GenericRowData convertInternal(List record) {
         GenericRowData rowData = new GenericRowData(deserializationConverters.length);
         for (int i = 0; i < deserializationConverters.length ; i++) {
             rowData.setField(i, deserializationConverters[i].deserialize(record.get(i)));
@@ -57,13 +79,23 @@ public class DorisRowConverter implements Serializable {
         return rowData;
     }
 
+    /**
+     * Convert data from {@link RowData} to {@link RowBatch}
+     * @param rowData record from flink rowdata
+     * @param index the field index
+     * @return java type value.
+     */
+    public Object convertExternal(RowData rowData, int index) {
+        return serializationConverters[index].serialize(index, rowData);
+    }
+
 
     /**
      * Create a nullable runtime {@link DeserializationConverter} from given {@link
      * LogicalType}.
      */
-    protected DeserializationConverter createNullableConverter(LogicalType type) {
-        return wrapIntoNullableInternalConverter(createConverter(type));
+    protected DeserializationConverter createNullableInternalConverter(LogicalType type) {
+        return wrapIntoNullableInternalConverter(createInternalConverter(type));
     }
 
     protected DeserializationConverter wrapIntoNullableInternalConverter(
@@ -77,6 +109,20 @@ public class DorisRowConverter implements Serializable {
         };
     }
 
+    protected SerializationConverter createNullableExternalConverter(LogicalType type) {
+        return wrapIntoNullableExternalConverter(createExternalConverter(type));
+    }
+
+    protected SerializationConverter wrapIntoNullableExternalConverter(SerializationConverter serializationConverter) {
+        return (index, val) -> {
+            if (val == null) {
+                return null;
+            } else {
+                return serializationConverter.serialize(index, val);
+            }
+        };
+    }
+
     /** Runtime converter to convert doris field to {@link RowData} type object. */
     @FunctionalInterface
     interface DeserializationConverter extends Serializable {
@@ -88,7 +134,15 @@ public class DorisRowConverter implements Serializable {
         Object deserialize(Object field);
     }
 
-    protected DeserializationConverter createConverter(LogicalType type) {
+    /**
+     * Runtime converter to convert {@link RowData} type object to doris field.
+     */
+    @FunctionalInterface
+    interface SerializationConverter extends Serializable {
+        Object serialize(int index, RowData field);
+    }
+
+    protected DeserializationConverter createInternalConverter(LogicalType type) {
         switch (type.getTypeRoot()) {
             case NULL:
                 return val -> null;
@@ -109,7 +163,21 @@ public class DorisRowConverter implements Serializable {
             case TIMESTAMP_WITH_TIME_ZONE:
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return val -> {
+                    if (val instanceof LocalDateTime) {
+                        return TimestampData.fromLocalDateTime((LocalDateTime) val);
+                    } else {
+                        throw new UnsupportedOperationException("timestamp type must be java.time.LocalDateTime, the actual type is: " + val.getClass().getName());
+                    }
+                };
             case DATE:
+                return val -> {
+                    if (val instanceof LocalDate) {
+                        return (int) ((LocalDate) val).toEpochDay();
+                    } else {
+                        throw new UnsupportedOperationException("timestamp type must be java.time.LocalDate, the actual type is: " + val.getClass());
+                    }
+                };
             case CHAR:
             case VARCHAR:
                 return val -> StringData.fromString((String) val);
@@ -125,4 +193,59 @@ public class DorisRowConverter implements Serializable {
                 throw new UnsupportedOperationException("Unsupported type:" + type);
         }
     }
+
+    protected SerializationConverter createExternalConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return ((index, val) -> null);
+            case CHAR:
+            case VARCHAR:
+                return (index, val) -> val.getString(index);
+            case BOOLEAN:
+                return (index, val) -> val.getBoolean(index);
+            case BINARY:
+            case VARBINARY:
+                return (index, val) -> val.getBinary(index);
+            case DECIMAL:
+                final int decimalPrecision = ((DecimalType) type).getPrecision();
+                final int decimalScale = ((DecimalType) type).getScale();
+                return (index, val) -> val.getDecimal(index, decimalPrecision, decimalScale);
+            case TINYINT:
+                return (index, val) -> val.getByte(index);
+            case SMALLINT:
+                return (index, val) -> val.getShort(index);
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_DAY_TIME:
+                return (index, val) -> val.getInt(index);
+            case BIGINT:
+                return (index, val) -> val.getLong(index);
+            case FLOAT:
+                return (index, val) -> val.getFloat(index);
+            case DOUBLE:
+                return (index, val) -> val.getDouble(index);
+            case DATE:
+                return (index, val) -> Date.valueOf(LocalDate.ofEpochDay(val.getInt(index)));
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                final int timestampPrecision = ((TimestampType) type).getPrecision();
+                return (index, val) -> val.getTimestamp(index, timestampPrecision).toTimestamp();
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int localP = ((LocalZonedTimestampType) type).getPrecision();
+                return (index, val) -> val.getTimestamp(index, localP).toTimestamp();
+            case TIMESTAMP_WITH_TIME_ZONE:
+                final int zonedP = ((ZonedTimestampType) type).getPrecision();
+            return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
+            case ARRAY:
+            case MULTISET:
+            case MAP:
+            case ROW:
+            case STRUCTURED_TYPE:
+            case DISTINCT_TYPE:
+            case RAW:
+            case SYMBOL:
+            case UNRESOLVED:
+            default:
+                throw new UnsupportedOperationException("Unsupported type:" + type);
+        }
+    }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 3be6f87..4dd6732 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -42,6 +42,9 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -79,6 +82,9 @@ public class RowBatch {
     private RootAllocator rootAllocator;
     private final Schema schema;
 
+    private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
     public List<Row> getRowBatch() {
         return rowBatch;
     }
@@ -243,8 +249,34 @@ public class RowBatch {
                         }
                         break;
                     case "DATE":
-                    case "LARGEINT":
+                        Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
+                                typeMismatchMessage(currentType, mt));
+                        VarCharVector date = (VarCharVector) curFieldVector;
+                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
+                            if (date.isNull(rowIndex)) {
+                                addValueToRow(rowIndex, null);
+                                continue;
+                            }
+                            String value = new String(date.get(rowIndex));
+                            LocalDate localDate = LocalDate.parse(value, dateFormatter);
+                            addValueToRow(rowIndex, localDate);
+                        }
+                        break;
                     case "DATETIME":
+                        Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
+                                typeMismatchMessage(currentType, mt));
+                        VarCharVector timeStampSecVector = (VarCharVector) curFieldVector;
+                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
+                            if (timeStampSecVector.isNull(rowIndex)) {
+                                addValueToRow(rowIndex, null);
+                                continue;
+                            }
+                            String value = new String(timeStampSecVector.get(rowIndex));
+                            LocalDateTime parse = LocalDateTime.parse(value, dateTimeFormatter);
+                            addValueToRow(rowIndex, parse);
+                        }
+                        break;
+                    case "LARGEINT":
                     case "CHAR":
                     case "VARCHAR":
                     case "STRING":
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
index 4d692c0..5d00301 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
@@ -18,6 +18,7 @@
 package org.apache.doris.flink.sink.writer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
@@ -33,18 +34,17 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
 import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
 import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE;
-import static org.apache.flink.table.data.RowData.createFieldGetter;
 
 /**
  * Serializer for RowData.
  */
 public class RowDataSerializer implements DorisRecordSerializer<RowData> {
     String[] fieldNames;
-    RowData.FieldGetter[] fieldGetters;
     String type;
     private ObjectMapper objectMapper;
     private final String fieldDelimiter;
     private final boolean enableDelete;
+    private final DorisRowConverter rowConverter;
 
     private RowDataSerializer(String[] fieldNames, DataType[] dataTypes, String type, String fieldDelimiter, boolean enableDelete) {
         this.fieldNames = fieldNames;
@@ -54,15 +54,12 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> {
         if (JSON.equals(type)) {
             objectMapper = new ObjectMapper();
         }
-        this.fieldGetters = new RowData.FieldGetter[dataTypes.length];
-        for (int fieldIndex = 0; fieldIndex < dataTypes.length; fieldIndex++) {
-            fieldGetters[fieldIndex] = createFieldGetter(dataTypes[fieldIndex].getLogicalType(), fieldIndex);
-        }
+        this.rowConverter = new DorisRowConverter(dataTypes);
     }
 
     @Override
     public byte[] serialize(RowData record) throws IOException{
-        int maxIndex = Math.min(record.getArity(), fieldGetters.length);
+        int maxIndex = Math.min(record.getArity(), fieldNames.length);
         String valString;
         if (JSON.equals(type)) {
             valString = buildJsonString(record, maxIndex);
@@ -78,7 +75,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> {
         int fieldIndex = 0;
         Map<String, String> valueMap = new HashMap<>();
         while (fieldIndex < maxIndex) {
-            Object field = fieldGetters[fieldIndex].getFieldOrNull(record);
+            Object field = rowConverter.convertExternal(record, fieldIndex);
             String value = field != null ? field.toString() : null;
             valueMap.put(fieldNames[fieldIndex], value);
             fieldIndex++;
@@ -93,7 +90,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> {
         int fieldIndex = 0;
         StringJoiner joiner = new StringJoiner(fieldDelimiter);
         while (fieldIndex < maxIndex) {
-            Object field = fieldGetters[fieldIndex].getFieldOrNull(record);
+            Object field = rowConverter.convertExternal(record, fieldIndex);
             String value = field != null ? field.toString() : NULL_VALUE;
             joiner.add(value);
             fieldIndex++;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index fbcaeea..16cf2ee 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -148,7 +148,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
             return null;
         }
         List next = (List) scalaValueReader.next();
-        RowData genericRowData = rowConverter.convert(next);
+        RowData genericRowData = rowConverter.convertInternal(next);
         //update hasNext after we've read the record
         hasNext = scalaValueReader.hasNext();
         return genericRowData;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisDateAndTimestampSqlTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisDateAndTimestampSqlTest.java
new file mode 100644
index 0000000..d135995
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisDateAndTimestampSqlTest.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+
+import java.util.UUID;
+
+public class DorisDateAndTimestampSqlTest {
+
+    public static void main(String[] args) {
+        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv.executeSql("create table test_source ( " +
+                "        id INT, " +
+                "        score DECIMAL(10, 9), " +
+                "        submit_time TIMESTAMP " +
+                "        ) with ( " +
+                "        'password'='', " +
+                "        'connector'='doris', " +
+                "        'fenodes'='FE_HOST:FE_PORT', " +
+                "        'table.identifier'='db.source_table', " +
+                "        'username'='root' " +
+                ")");
+
+        tEnv.executeSql("create table test_sink ( " +
+                "        id INT, " +
+                "        score DECIMAL(10, 9), " +
+                "        submit_time DATE " +
+                "        ) with ( " +
+                "        'password'='', " +
+                "        'connector'='doris', " +
+                "        'fenodes'='FE_HOST:FE_PORT', " +
+                "        'sink.label-prefix' = 'label_" + UUID.randomUUID()+"' , " +
+                "        'table.identifier'='db.sink_table', " +
+                "        'username'='root' " +
+                ")");
+        tEnv.executeSql(
+                "insert into " +
+                        "    test_sink " +
+                        "select " +
+                        "    id, " +
+                        "    score," +
+                        "    to_date(DATE_FORMAT(submit_time, 'yyyy-MM-dd')) as submit_time " +
+                        "from " +
+                        "    test_source " +
+                        "where " +
+                        "    submit_time>='2022-05-31 00:00:00'")
+                .print();
+    }
+
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
index 3489399..e56a40e 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -17,23 +17,33 @@
 package org.apache.doris.flink.deserialization.convert;
 
 import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.doris.flink.sink.writer.RowDataSerializer;
+import org.apache.doris.flink.sink.writer.RowDataSerializer.Builder;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public class DorisRowConverterTest implements Serializable {
 
     @Test
-    public void testConvert(){
+    public void testConvert() throws IOException {
         ResolvedSchema SCHEMA =
                 ResolvedSchema.of(
                         Column.physical("f1", DataTypes.NULL()),
@@ -55,8 +65,54 @@ public class DorisRowConverterTest implements Serializable {
 
         DorisRowConverter converter = new DorisRowConverter((RowType) SCHEMA.toPhysicalRowDataType().getLogicalType());
 
-        List record = Arrays.asList(null,"true",1.2,1.2345,24,10,1,32,64,128, BigDecimal.valueOf(10.123),"2021-01-01 08:00:00","2021-01-01 08:00:00","2021-01-01","a","doris");
-        GenericRowData rowData = converter.convert(record);
-        Assert.assertEquals("+I(null,true,1.2,1.2345,24,10,1,32,64,128,10.12,2021-01-01 08:00:00,2021-01-01 08:00:00,2021-01-01,a,doris)",rowData.toString());
+        LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+        LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+        LocalDate date1 = LocalDate.of(2021, 1, 1);
+        List record = Arrays.asList(null,true,1.2F,1.2345D,24,10,(byte) 1, (short) 32,64,128L, BigDecimal.valueOf(10.123),time1,time2, date1,"a","doris");
+        GenericRowData rowData = converter.convertInternal(record);
+
+        RowDataSerializer serializer = new Builder()
+                .setFieldType(SCHEMA.getColumnDataTypes().toArray(new DataType[0]))
+                .setType("csv")
+                .setFieldDelimiter("|")
+                .setFieldNames(new String[]{"f1","f2","f3","f4","f5","f6","f7","f8","f9","f10","f11","f12","f13","f14","f15","f16"})
+                .build();
+        String s = new String(serializer.serialize(rowData));
+        Assert.assertEquals("\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris", s);
+    }
+
+    @Test
+    public void testExternalConvert() {
+        ResolvedSchema SCHEMA =
+                ResolvedSchema.of(
+                        Column.physical("f1", DataTypes.NULL()),
+                        Column.physical("f2", DataTypes.BOOLEAN()),
+                        Column.physical("f3", DataTypes.FLOAT()),
+                        Column.physical("f4", DataTypes.DOUBLE()),
+                        Column.physical("f5", DataTypes.INTERVAL(DataTypes.YEAR())),
+                        Column.physical("f6", DataTypes.INTERVAL(DataTypes.DAY())),
+                        Column.physical("f7", DataTypes.TINYINT()),
+                        Column.physical("f8", DataTypes.SMALLINT()),
+                        Column.physical("f9", DataTypes.INT()),
+                        Column.physical("f10", DataTypes.BIGINT()),
+                        Column.physical("f11", DataTypes.DECIMAL(10,2)),
+                        Column.physical("f12", DataTypes.TIMESTAMP_WITH_TIME_ZONE()),
+                        Column.physical("f13", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
+                        Column.physical("f14", DataTypes.DATE()),
+                        Column.physical("f15", DataTypes.CHAR(1)),
+                        Column.physical("f16", DataTypes.VARCHAR(256)));
+        DorisRowConverter converter = new DorisRowConverter((RowType) SCHEMA.toPhysicalRowDataType().getLogicalType());
+        LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+        LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+        LocalDate date1 = LocalDate.of(2021, 1, 1);
+        GenericRowData rowData = GenericRowData.of(null, true, 1.2F, 1.2345D, 24, 10, (byte) 1, (short) 32, 64, 128L,
+                DecimalData.fromBigDecimal(BigDecimal.valueOf(10.123), 5, 3),
+                TimestampData.fromLocalDateTime(time1), TimestampData.fromLocalDateTime(time2),
+                (int) date1.toEpochDay(), StringData.fromString("a"), StringData.fromString("doris"));
+        List row = new ArrayList();
+        for (int i = 0; i < rowData.getArity(); i++) {
+            row.add(converter.convertExternal(rowData, i));
+        }
+        Assert.assertEquals("[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:00:00.0, 2021-01-01 08:00:00.0, 2021-01-01, a, doris]", row.toString());
     }
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 8b66e01..261acbe 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -247,8 +249,8 @@ public class TestRowBatch {
                 1L,
                 (float) 1.1,
                 (double) 1.1,
-                "2008-08-08",
-                "2008-08-08 00:00:00",
+                LocalDate.of(2008, 8, 8),
+                LocalDateTime.of(2008,8,8,0,0,0),
                 DecimalData.fromBigDecimal(new BigDecimal(12.34), 4, 2),
                 "char1"
         );
@@ -261,8 +263,8 @@ public class TestRowBatch {
                 2L,
                 (float) 2.2,
                 (double) 2.2,
-                "1900-08-08",
-                "1900-08-08 00:00:00",
+                LocalDate.of(1900, 8, 8),
+                LocalDateTime.of(1900,8,8,0,0,0),
                 DecimalData.fromBigDecimal(new BigDecimal(88.88), 4, 2),
                 "char2"
         );
@@ -275,8 +277,8 @@ public class TestRowBatch {
                 3L,
                 (float) 3.3,
                 (double) 3.3,
-                "2100-08-08",
-                "2100-08-08 00:00:00",
+                LocalDate.of(2100, 8, 8),
+                LocalDateTime.of(2100,8,8,0,0,0),
                 DecimalData.fromBigDecimal(new BigDecimal(10.22), 4, 2),
                 "char3"
         );


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org