You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/15 07:58:04 UTC
[inlong] 05/06: [INLONG-6507][Sort] Convert date to timestamp in oracle connector (#6508)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 6b909beb7962f1cfc9b98cd2e12cafaea3f60d2c
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Tue Nov 15 15:40:07 2022 +0800
[INLONG-6507][Sort] Convert date to timestamp in oracle connector (#6508)
---
.../table/RowDataDebeziumDeserializeSchema.java | 50 +++++++++++-----------
1 file changed, 24 insertions(+), 26 deletions(-)
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java
index a0dcb5cf2..530b96d6d 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -83,8 +83,7 @@ public final class RowDataDebeziumDeserializeSchema
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
- private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern(
- "yyyy-MM-dd HH:mm:ss");
+ private static final ZoneId ZONE_UTC = ZoneId.of("UTC");
/**
* TypeInformation of the produced {@link RowData}. *
@@ -691,7 +690,7 @@ public final class RowDataDebeziumDeserializeSchema
if (schemaName != null) {
// normal type doesn't have schema name
// schema names are time schemas
- fieldValue = getTimeValue(fieldValue, schemaName);
+ fieldValue = getValueWithSchema(fieldValue, schemaName);
}
data.put(fieldName, fieldValue);
}
@@ -714,6 +713,7 @@ public final class RowDataDebeziumDeserializeSchema
String fieldName = field.name();
Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
+ String schemaName = fieldSchema.name();
// struct type convert normal type
if (fieldValue instanceof Struct) {
@@ -732,6 +732,9 @@ public final class RowDataDebeziumDeserializeSchema
fieldValue = ((TimestampData) fieldValue).toTimestamp();
}
}
+ if (schemaName != null) {
+ fieldValue = getValueWithSchema(fieldValue, schemaName);
+ }
if (fieldValue instanceof ByteBuffer) {
fieldValue = new String(((ByteBuffer) fieldValue).array());
}
@@ -748,33 +751,28 @@ public final class RowDataDebeziumDeserializeSchema
}
/**
- * transform debezium time format to database format
+ * extract the data with the format provided by debezium
*
* @param fieldValue
* @param schemaName
- * @return
+ * @return the extracted data with schema
*/
- private Object getTimeValue(Object fieldValue, String schemaName) {
- switch (schemaName) {
- case MicroTime.SCHEMA_NAME:
- Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
- fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant, serverTimeZone));
- break;
- case Date.SCHEMA_NAME:
- fieldValue = dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));
- break;
- case ZonedTimestamp.SCHEMA_NAME:
- ZonedDateTime zonedDateTime = ZonedDateTime.parse((CharSequence) fieldValue);
- fieldValue = timestampFormatter.format(zonedDateTime
- .withZoneSameInstant(serverTimeZone).toLocalDateTime());
- break;
- case Timestamp.SCHEMA_NAME:
- Instant instantTime = Instant.ofEpochMilli((Long) fieldValue);
- fieldValue = timestampFormatter.format(LocalDateTime.ofInstant(instantTime,
- serverTimeZone));
- break;
- default:
- LOG.error("parse schema {} error", schemaName);
+ private Object getValueWithSchema(Object fieldValue, String schemaName) {
+ if (fieldValue == null) {
+ return null;
+ }
+ if (MicroTime.SCHEMA_NAME.equals(schemaName)) {
+ Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
+ fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant, ZONE_UTC));
+ } else if (Date.SCHEMA_NAME.equals(schemaName)) {
+ fieldValue = dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));
+ } else if (ZonedTimestamp.SCHEMA_NAME.equals(schemaName)) {
+ ZonedDateTime zonedDateTime = ZonedDateTime.parse((CharSequence) fieldValue);
+ fieldValue = zonedDateTime.withZoneSameInstant(serverTimeZone).toLocalDateTime()
+ .atZone(ZONE_UTC).format(DateTimeFormatter.ISO_INSTANT);
+ } else if (Timestamp.SCHEMA_NAME.equals(schemaName)) {
+ Instant instantTime = Instant.ofEpochMilli((Long) fieldValue);
+ fieldValue = LocalDateTime.ofInstant(instantTime, ZONE_UTC).toString();
}
return fieldValue;
}