You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/31 05:24:06 UTC
[inlong] branch master updated: [INLONG-6326][Sort] Fix the incorrect log type in the all migrate converter (#6328)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new be28a0789 [INLONG-6326][Sort] Fix the incorrect log type in the all migrate converter (#6328)
be28a0789 is described below
commit be28a07899a1e4a9f80e6128acfae6bdb9973bba
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Mon Oct 31 13:24:00 2022 +0800
[INLONG-6326][Sort] Fix the incorrect log type in the all migrate converter (#6328)
---
.../debezium/table/RowDataDebeziumDeserializeSchema.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index 6414620c5..204199598 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -596,9 +596,7 @@ public final class RowDataDebeziumDeserializeSchema
Schema fieldSchema = schema.field(fieldName).schema();
String schemaName = fieldSchema.name();
if (schemaName != null) {
- // normal type doesn't have schema name
- // schema names are time schemas
- fieldValue = getTimeValue(fieldValue, schemaName);
+ fieldValue = getValueWithSchema(fieldValue, schemaName);
}
data.put(fieldName, fieldValue);
}
@@ -612,13 +610,13 @@ public final class RowDataDebeziumDeserializeSchema
}
/**
- * transform debezium time format to database format
+ * extract the data with the format provided by debezium
*
* @param fieldValue
* @param schemaName
- * @return
+ * @return the extracted data with schema
*/
- private Object getTimeValue(Object fieldValue, String schemaName) {
+ private Object getValueWithSchema(Object fieldValue, String schemaName) {
if (fieldValue == null) {
return null;
}
@@ -638,8 +636,11 @@ public final class RowDataDebeziumDeserializeSchema
fieldValue = DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.ofInstant(instantTime,
serverTimeZone));
break;
+ case Decimal.LOGICAL_NAME:
+ // no need to transfer decimal type since the value is already decimal
+ break;
default:
- LOG.error("parse schema {} error", schemaName);
+ LOG.debug("schema {} is not being supported", schemaName);
}
return fieldValue;
}