You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/15 07:58:04 UTC

[inlong] 05/06: [INLONG-6507][Sort] Convert date to timestamp in oracle connector (#6508)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 6b909beb7962f1cfc9b98cd2e12cafaea3f60d2c
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Tue Nov 15 15:40:07 2022 +0800

    [INLONG-6507][Sort] Convert date to timestamp in oracle connector (#6508)
---
 .../table/RowDataDebeziumDeserializeSchema.java    | 50 +++++++++++-----------
 1 file changed, 24 insertions(+), 26 deletions(-)

diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java
index a0dcb5cf2..530b96d6d 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -83,8 +83,7 @@ public final class RowDataDebeziumDeserializeSchema
 
     private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
 
-    private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern(
-            "yyyy-MM-dd HH:mm:ss");
+    private static final ZoneId ZONE_UTC = ZoneId.of("UTC");
 
     /**
      * TypeInformation of the produced {@link RowData}. *
@@ -691,7 +690,7 @@ public final class RowDataDebeziumDeserializeSchema
                     if (schemaName != null) {
                         // normal type doesn't have schema name
                         // schema names are time schemas
-                        fieldValue = getTimeValue(fieldValue, schemaName);
+                        fieldValue = getValueWithSchema(fieldValue, schemaName);
                     }
                     data.put(fieldName, fieldValue);
                 }
@@ -714,6 +713,7 @@ public final class RowDataDebeziumDeserializeSchema
                     String fieldName = field.name();
                     Object fieldValue = struct.getWithoutDefault(fieldName);
                     Schema fieldSchema = schema.field(fieldName).schema();
+                    String schemaName = fieldSchema.name();
 
                     // struct type convert normal type
                     if (fieldValue instanceof Struct) {
@@ -732,6 +732,9 @@ public final class RowDataDebeziumDeserializeSchema
                             fieldValue = ((TimestampData) fieldValue).toTimestamp();
                         }
                     }
+                    if (schemaName != null) {
+                        fieldValue = getValueWithSchema(fieldValue, schemaName);
+                    }
                     if (fieldValue instanceof ByteBuffer) {
                         fieldValue = new String(((ByteBuffer) fieldValue).array());
                     }
@@ -748,33 +751,28 @@ public final class RowDataDebeziumDeserializeSchema
     }
 
     /**
-     * transform debezium time format to database format
+     * extract the data with the format provided by debezium
      *
      * @param fieldValue
      * @param schemaName
-     * @return
+     * @return the extracted data with schema
      */
-    private Object getTimeValue(Object fieldValue, String schemaName) {
-        switch (schemaName) {
-            case MicroTime.SCHEMA_NAME:
-                Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
-                fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant, serverTimeZone));
-                break;
-            case Date.SCHEMA_NAME:
-                fieldValue = dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));
-                break;
-            case ZonedTimestamp.SCHEMA_NAME:
-                ZonedDateTime zonedDateTime = ZonedDateTime.parse((CharSequence) fieldValue);
-                fieldValue = timestampFormatter.format(zonedDateTime
-                        .withZoneSameInstant(serverTimeZone).toLocalDateTime());
-                break;
-            case Timestamp.SCHEMA_NAME:
-                Instant instantTime = Instant.ofEpochMilli((Long) fieldValue);
-                fieldValue = timestampFormatter.format(LocalDateTime.ofInstant(instantTime,
-                        serverTimeZone));
-                break;
-            default:
-                LOG.error("parse schema {} error", schemaName);
+    private Object getValueWithSchema(Object fieldValue, String schemaName) {
+        if (fieldValue == null) {
+            return null;
+        }
+        if (MicroTime.SCHEMA_NAME.equals(schemaName)) {
+            Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
+            fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant, ZONE_UTC));
+        } else if (Date.SCHEMA_NAME.equals(schemaName)) {
+            fieldValue = dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));
+        } else if (ZonedTimestamp.SCHEMA_NAME.equals(schemaName)) {
+            ZonedDateTime zonedDateTime = ZonedDateTime.parse((CharSequence) fieldValue);
+            fieldValue = zonedDateTime.withZoneSameInstant(serverTimeZone).toLocalDateTime()
+                    .atZone(ZONE_UTC).format(DateTimeFormatter.ISO_INSTANT);
+        } else if (Timestamp.SCHEMA_NAME.equals(schemaName)) {
+            Instant instantTime = Instant.ofEpochMilli((Long) fieldValue);
+            fieldValue = LocalDateTime.ofInstant(instantTime, ZONE_UTC).toString();
         }
         return fieldValue;
     }