You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2022/06/12 13:56:25 UTC
[incubator-doris-flink-connector] branch master updated: [Fix] fix flink date and timestamp type not mapping
This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7aa8d4b [Fix] fix flink date and timestamp type not mapping
7aa8d4b is described below
commit 7aa8d4bce84eb68a4bf2f842f24fdd6c897ff910
Author: gj-zhang <ra...@163.com>
AuthorDate: Sun Jun 12 21:56:19 2022 +0800
[Fix] fix flink date and timestamp type not mapping
* [FIX] fix flink date and timestamp type not mapping.
* The type conversion here is recommended to be abstracted and placed in DorisRowConverter.
* overload the constructor for the DorisRowConverter.
add unit test for external convert
* add license header.
---
.../RowDataDeserializationSchema.java | 2 +-
.../converter/DorisRowConverter.java | 133 ++++++++++++++++++++-
.../apache/doris/flink/serialization/RowBatch.java | 34 +++++-
.../doris/flink/sink/writer/RowDataSerializer.java | 15 +--
.../doris/flink/table/DorisRowDataInputFormat.java | 2 +-
.../doris/flink/DorisDateAndTimestampSqlTest.java | 66 ++++++++++
.../convert/DorisRowConverterTest.java | 64 +++++++++-
.../doris/flink/serialization/TestRowBatch.java | 14 ++-
8 files changed, 303 insertions(+), 27 deletions(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
index f9c65b3..3342b66 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java
@@ -44,7 +44,7 @@ public class RowDataDeserializationSchema implements DorisDeserializationSchema<
@Override
public void deserialize(List<?> record, Collector<RowData> out) throws Exception {
- RowData row = rowConverter.convert(record);
+ RowData row = rowConverter.convertInternal(record);
out.collect(row);
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index ee7a9a7..d1bb529 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -21,12 +21,20 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
import java.io.Serializable;
import java.math.BigDecimal;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -35,12 +43,26 @@ public class DorisRowConverter implements Serializable {
private static final long serialVersionUID = 1L;
private final DeserializationConverter[] deserializationConverters;
+ private final SerializationConverter[] serializationConverters;
public DorisRowConverter(RowType rowType) {
checkNotNull(rowType);
this.deserializationConverters = new DeserializationConverter[rowType.getFieldCount()];
+ this.serializationConverters = new SerializationConverter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
- deserializationConverters[i] = createNullableConverter(rowType.getTypeAt(i));
+ deserializationConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i));
+ serializationConverters[i] = createNullableExternalConverter(rowType.getTypeAt(i));
+ }
+ }
+
+ public DorisRowConverter(DataType[] dataTypes) {
+ checkNotNull(dataTypes);
+ this.deserializationConverters = new DeserializationConverter[dataTypes.length];
+ this.serializationConverters = new SerializationConverter[dataTypes.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ LogicalType logicalType = dataTypes[i].getLogicalType();
+ deserializationConverters[i] = createNullableInternalConverter(logicalType);
+ serializationConverters[i] = createNullableExternalConverter(logicalType);
}
}
@@ -49,7 +71,7 @@ public class DorisRowConverter implements Serializable {
*
* @param record from rowBatch
*/
- public GenericRowData convert(List record){
+ public GenericRowData convertInternal(List record) {
GenericRowData rowData = new GenericRowData(deserializationConverters.length);
for (int i = 0; i < deserializationConverters.length ; i++) {
rowData.setField(i, deserializationConverters[i].deserialize(record.get(i)));
@@ -57,13 +79,23 @@ public class DorisRowConverter implements Serializable {
return rowData;
}
+ /**
+ * Convert data from {@link RowData} to {@link RowBatch}
+ * @param rowData record from flink rowdata
+ * @param index the field index
+ * @return java type value.
+ */
+ public Object convertExternal(RowData rowData, int index) {
+ return serializationConverters[index].serialize(index, rowData);
+ }
+
/**
* Create a nullable runtime {@link DeserializationConverter} from given {@link
* LogicalType}.
*/
- protected DeserializationConverter createNullableConverter(LogicalType type) {
- return wrapIntoNullableInternalConverter(createConverter(type));
+ protected DeserializationConverter createNullableInternalConverter(LogicalType type) {
+ return wrapIntoNullableInternalConverter(createInternalConverter(type));
}
protected DeserializationConverter wrapIntoNullableInternalConverter(
@@ -77,6 +109,20 @@ public class DorisRowConverter implements Serializable {
};
}
+ protected SerializationConverter createNullableExternalConverter(LogicalType type) {
+ return wrapIntoNullableExternalConverter(createExternalConverter(type));
+ }
+
+ protected SerializationConverter wrapIntoNullableExternalConverter(SerializationConverter serializationConverter) {
+ return (index, val) -> {
+ if (val == null) {
+ return null;
+ } else {
+ return serializationConverter.serialize(index, val);
+ }
+ };
+ }
+
/** Runtime converter to convert doris field to {@link RowData} type object. */
@FunctionalInterface
interface DeserializationConverter extends Serializable {
@@ -88,7 +134,15 @@ public class DorisRowConverter implements Serializable {
Object deserialize(Object field);
}
- protected DeserializationConverter createConverter(LogicalType type) {
+ /**
+ * Runtime converter to convert {@link RowData} type object to doris field.
+ */
+ @FunctionalInterface
+ interface SerializationConverter extends Serializable {
+ Object serialize(int index, RowData field);
+ }
+
+ protected DeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
@@ -109,7 +163,21 @@ public class DorisRowConverter implements Serializable {
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return val -> {
+ if (val instanceof LocalDateTime) {
+ return TimestampData.fromLocalDateTime((LocalDateTime) val);
+ } else {
+ throw new UnsupportedOperationException("timestamp type must be java.time.LocalDateTime, the actual type is: " + val.getClass().getName());
+ }
+ };
case DATE:
+ return val -> {
+ if (val instanceof LocalDate) {
+ return (int) ((LocalDate) val).toEpochDay();
+ } else {
+ throw new UnsupportedOperationException("timestamp type must be java.time.LocalDate, the actual type is: " + val.getClass());
+ }
+ };
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
@@ -125,4 +193,59 @@ public class DorisRowConverter implements Serializable {
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
+
+ protected SerializationConverter createExternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return ((index, val) -> null);
+ case CHAR:
+ case VARCHAR:
+ return (index, val) -> val.getString(index);
+ case BOOLEAN:
+ return (index, val) -> val.getBoolean(index);
+ case BINARY:
+ case VARBINARY:
+ return (index, val) -> val.getBinary(index);
+ case DECIMAL:
+ final int decimalPrecision = ((DecimalType) type).getPrecision();
+ final int decimalScale = ((DecimalType) type).getScale();
+ return (index, val) -> val.getDecimal(index, decimalPrecision, decimalScale);
+ case TINYINT:
+ return (index, val) -> val.getByte(index);
+ case SMALLINT:
+ return (index, val) -> val.getShort(index);
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ return (index, val) -> val.getInt(index);
+ case BIGINT:
+ return (index, val) -> val.getLong(index);
+ case FLOAT:
+ return (index, val) -> val.getFloat(index);
+ case DOUBLE:
+ return (index, val) -> val.getDouble(index);
+ case DATE:
+ return (index, val) -> Date.valueOf(LocalDate.ofEpochDay(val.getInt(index)));
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int timestampPrecision = ((TimestampType) type).getPrecision();
+ return (index, val) -> val.getTimestamp(index, timestampPrecision).toTimestamp();
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int localP = ((LocalZonedTimestampType) type).getPrecision();
+ return (index, val) -> val.getTimestamp(index, localP).toTimestamp();
+ case TIMESTAMP_WITH_TIME_ZONE:
+ final int zonedP = ((ZonedTimestampType) type).getPrecision();
+ return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
+ case ARRAY:
+ case MULTISET:
+ case MAP:
+ case ROW:
+ case STRUCTURED_TYPE:
+ case DISTINCT_TYPE:
+ case RAW:
+ case SYMBOL:
+ case UNRESOLVED:
+ default:
+ throw new UnsupportedOperationException("Unsupported type:" + type);
+ }
+ }
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 3be6f87..4dd6732 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -42,6 +42,9 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
@@ -79,6 +82,9 @@ public class RowBatch {
private RootAllocator rootAllocator;
private final Schema schema;
+ private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
public List<Row> getRowBatch() {
return rowBatch;
}
@@ -243,8 +249,34 @@ public class RowBatch {
}
break;
case "DATE":
- case "LARGEINT":
+ Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
+ typeMismatchMessage(currentType, mt));
+ VarCharVector date = (VarCharVector) curFieldVector;
+ for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
+ if (date.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ continue;
+ }
+ String value = new String(date.get(rowIndex));
+ LocalDate localDate = LocalDate.parse(value, dateFormatter);
+ addValueToRow(rowIndex, localDate);
+ }
+ break;
case "DATETIME":
+ Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
+ typeMismatchMessage(currentType, mt));
+ VarCharVector timeStampSecVector = (VarCharVector) curFieldVector;
+ for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
+ if (timeStampSecVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ continue;
+ }
+ String value = new String(timeStampSecVector.get(rowIndex));
+ LocalDateTime parse = LocalDateTime.parse(value, dateTimeFormatter);
+ addValueToRow(rowIndex, parse);
+ }
+ break;
+ case "LARGEINT":
case "CHAR":
case "VARCHAR":
case "STRING":
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
index 4d692c0..5d00301 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
@@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.writer;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
@@ -33,18 +34,17 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE;
-import static org.apache.flink.table.data.RowData.createFieldGetter;
/**
* Serializer for RowData.
*/
public class RowDataSerializer implements DorisRecordSerializer<RowData> {
String[] fieldNames;
- RowData.FieldGetter[] fieldGetters;
String type;
private ObjectMapper objectMapper;
private final String fieldDelimiter;
private final boolean enableDelete;
+ private final DorisRowConverter rowConverter;
private RowDataSerializer(String[] fieldNames, DataType[] dataTypes, String type, String fieldDelimiter, boolean enableDelete) {
this.fieldNames = fieldNames;
@@ -54,15 +54,12 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> {
if (JSON.equals(type)) {
objectMapper = new ObjectMapper();
}
- this.fieldGetters = new RowData.FieldGetter[dataTypes.length];
- for (int fieldIndex = 0; fieldIndex < dataTypes.length; fieldIndex++) {
- fieldGetters[fieldIndex] = createFieldGetter(dataTypes[fieldIndex].getLogicalType(), fieldIndex);
- }
+ this.rowConverter = new DorisRowConverter(dataTypes);
}
@Override
public byte[] serialize(RowData record) throws IOException{
- int maxIndex = Math.min(record.getArity(), fieldGetters.length);
+ int maxIndex = Math.min(record.getArity(), fieldNames.length);
String valString;
if (JSON.equals(type)) {
valString = buildJsonString(record, maxIndex);
@@ -78,7 +75,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> {
int fieldIndex = 0;
Map<String, String> valueMap = new HashMap<>();
while (fieldIndex < maxIndex) {
- Object field = fieldGetters[fieldIndex].getFieldOrNull(record);
+ Object field = rowConverter.convertExternal(record, fieldIndex);
String value = field != null ? field.toString() : null;
valueMap.put(fieldNames[fieldIndex], value);
fieldIndex++;
@@ -93,7 +90,7 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> {
int fieldIndex = 0;
StringJoiner joiner = new StringJoiner(fieldDelimiter);
while (fieldIndex < maxIndex) {
- Object field = fieldGetters[fieldIndex].getFieldOrNull(record);
+ Object field = rowConverter.convertExternal(record, fieldIndex);
String value = field != null ? field.toString() : NULL_VALUE;
joiner.add(value);
fieldIndex++;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index fbcaeea..16cf2ee 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -148,7 +148,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
return null;
}
List next = (List) scalaValueReader.next();
- RowData genericRowData = rowConverter.convert(next);
+ RowData genericRowData = rowConverter.convertInternal(next);
//update hasNext after we've read the record
hasNext = scalaValueReader.hasNext();
return genericRowData;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisDateAndTimestampSqlTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisDateAndTimestampSqlTest.java
new file mode 100644
index 0000000..d135995
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisDateAndTimestampSqlTest.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+
+import java.util.UUID;
+
+public class DorisDateAndTimestampSqlTest {
+
+ public static void main(String[] args) {
+ TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv.executeSql("create table test_source ( " +
+ " id INT, " +
+ " score DECIMAL(10, 9), " +
+ " submit_time TIMESTAMP " +
+ " ) with ( " +
+ " 'password'='', " +
+ " 'connector'='doris', " +
+ " 'fenodes'='FE_HOST:FE_PORT', " +
+ " 'table.identifier'='db.source_table', " +
+ " 'username'='root' " +
+ ")");
+
+ tEnv.executeSql("create table test_sink ( " +
+ " id INT, " +
+ " score DECIMAL(10, 9), " +
+ " submit_time DATE " +
+ " ) with ( " +
+ " 'password'='', " +
+ " 'connector'='doris', " +
+ " 'fenodes'='FE_HOST:FE_PORT', " +
+ " 'sink.label-prefix' = 'label_" + UUID.randomUUID()+"' , " +
+ " 'table.identifier'='db.sink_table', " +
+ " 'username'='root' " +
+ ")");
+ tEnv.executeSql(
+ "insert into " +
+ " test_sink " +
+ "select " +
+ " id, " +
+ " score," +
+ " to_date(DATE_FORMAT(submit_time, 'yyyy-MM-dd')) as submit_time " +
+ "from " +
+ " test_source " +
+ "where " +
+ " submit_time>='2022-05-31 00:00:00'")
+ .print();
+ }
+
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
index 3489399..e56a40e 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -17,23 +17,33 @@
package org.apache.doris.flink.deserialization.convert;
import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.doris.flink.sink.writer.RowDataSerializer;
+import org.apache.doris.flink.sink.writer.RowDataSerializer.Builder;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class DorisRowConverterTest implements Serializable {
@Test
- public void testConvert(){
+ public void testConvert() throws IOException {
ResolvedSchema SCHEMA =
ResolvedSchema.of(
Column.physical("f1", DataTypes.NULL()),
@@ -55,8 +65,54 @@ public class DorisRowConverterTest implements Serializable {
DorisRowConverter converter = new DorisRowConverter((RowType) SCHEMA.toPhysicalRowDataType().getLogicalType());
- List record = Arrays.asList(null,"true",1.2,1.2345,24,10,1,32,64,128, BigDecimal.valueOf(10.123),"2021-01-01 08:00:00","2021-01-01 08:00:00","2021-01-01","a","doris");
- GenericRowData rowData = converter.convert(record);
- Assert.assertEquals("+I(null,true,1.2,1.2345,24,10,1,32,64,128,10.12,2021-01-01 08:00:00,2021-01-01 08:00:00,2021-01-01,a,doris)",rowData.toString());
+ LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+ LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+ LocalDate date1 = LocalDate.of(2021, 1, 1);
+ List record = Arrays.asList(null,true,1.2F,1.2345D,24,10,(byte) 1, (short) 32,64,128L, BigDecimal.valueOf(10.123),time1,time2, date1,"a","doris");
+ GenericRowData rowData = converter.convertInternal(record);
+
+ RowDataSerializer serializer = new Builder()
+ .setFieldType(SCHEMA.getColumnDataTypes().toArray(new DataType[0]))
+ .setType("csv")
+ .setFieldDelimiter("|")
+ .setFieldNames(new String[]{"f1","f2","f3","f4","f5","f6","f7","f8","f9","f10","f11","f12","f13","f14","f15","f16"})
+ .build();
+ String s = new String(serializer.serialize(rowData));
+ Assert.assertEquals("\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris", s);
+ }
+
+ @Test
+ public void testExternalConvert() {
+ ResolvedSchema SCHEMA =
+ ResolvedSchema.of(
+ Column.physical("f1", DataTypes.NULL()),
+ Column.physical("f2", DataTypes.BOOLEAN()),
+ Column.physical("f3", DataTypes.FLOAT()),
+ Column.physical("f4", DataTypes.DOUBLE()),
+ Column.physical("f5", DataTypes.INTERVAL(DataTypes.YEAR())),
+ Column.physical("f6", DataTypes.INTERVAL(DataTypes.DAY())),
+ Column.physical("f7", DataTypes.TINYINT()),
+ Column.physical("f8", DataTypes.SMALLINT()),
+ Column.physical("f9", DataTypes.INT()),
+ Column.physical("f10", DataTypes.BIGINT()),
+ Column.physical("f11", DataTypes.DECIMAL(10,2)),
+ Column.physical("f12", DataTypes.TIMESTAMP_WITH_TIME_ZONE()),
+ Column.physical("f13", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
+ Column.physical("f14", DataTypes.DATE()),
+ Column.physical("f15", DataTypes.CHAR(1)),
+ Column.physical("f16", DataTypes.VARCHAR(256)));
+ DorisRowConverter converter = new DorisRowConverter((RowType) SCHEMA.toPhysicalRowDataType().getLogicalType());
+ LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+ LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+ LocalDate date1 = LocalDate.of(2021, 1, 1);
+ GenericRowData rowData = GenericRowData.of(null, true, 1.2F, 1.2345D, 24, 10, (byte) 1, (short) 32, 64, 128L,
+ DecimalData.fromBigDecimal(BigDecimal.valueOf(10.123), 5, 3),
+ TimestampData.fromLocalDateTime(time1), TimestampData.fromLocalDateTime(time2),
+ (int) date1.toEpochDay(), StringData.fromString("a"), StringData.fromString("doris"));
+ List row = new ArrayList();
+ for (int i = 0; i < rowData.getArity(); i++) {
+ row.add(converter.convertExternal(rowData, i));
+ }
+ Assert.assertEquals("[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:00:00.0, 2021-01-01 08:00:00.0, 2021-01-01, a, doris]", row.toString());
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 8b66e01..261acbe 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
@@ -247,8 +249,8 @@ public class TestRowBatch {
1L,
(float) 1.1,
(double) 1.1,
- "2008-08-08",
- "2008-08-08 00:00:00",
+ LocalDate.of(2008, 8, 8),
+ LocalDateTime.of(2008,8,8,0,0,0),
DecimalData.fromBigDecimal(new BigDecimal(12.34), 4, 2),
"char1"
);
@@ -261,8 +263,8 @@ public class TestRowBatch {
2L,
(float) 2.2,
(double) 2.2,
- "1900-08-08",
- "1900-08-08 00:00:00",
+ LocalDate.of(1900, 8, 8),
+ LocalDateTime.of(1900,8,8,0,0,0),
DecimalData.fromBigDecimal(new BigDecimal(88.88), 4, 2),
"char2"
);
@@ -275,8 +277,8 @@ public class TestRowBatch {
3L,
(float) 3.3,
(double) 3.3,
- "2100-08-08",
- "2100-08-08 00:00:00",
+ LocalDate.of(2100, 8, 8),
+ LocalDateTime.of(2100,8,8,0,0,0),
DecimalData.fromBigDecimal(new BigDecimal(10.22), 4, 2),
"char3"
);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org