You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by yu...@apache.org on 2022/11/07 03:17:45 UTC

[inlong] branch master updated: [INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404)

This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f8266fe35 [INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404)
f8266fe35 is described below

commit f8266fe35c83456eabe356ff27927deafd6342f0
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Mon Nov 7 11:17:39 2022 +0800

    [INLONG-6402][Sort] Modify the metadata field of oracle connector (#6404)
    
    * [INLONG-6402][Sort] Modify the metadata field of oracle connector
    
    * [INLONG-6402][Sort] Remove extra punctuation
    
    * [INLONG-6402][Sort] Modify the metadata field of oracle connector to be consistent with mysql connector
    
    * [INLONG-6402][Sort] Compatible with open source oracle connector
    
    * [INLONG-6402][Sort] Remove incorrect information
    
    * [INLONG-6402][Sort] Remove unused meta field
    
    Co-authored-by: menghuiyu <me...@tencent.com>
---
 .../org/apache/inlong/common/enums/MetaField.java  |   5 +
 .../org/apache/inlong/sort/protocol/Metadata.java  |  11 +-
 .../protocol/node/extract/OracleExtractNode.java   |  55 ++++++-
 .../node/extract/OracleExtractNodeTest.java        |  14 +-
 .../cdc/oracle/table/OracleReadableMetaData.java   | 166 ++++++++-------------
 5 files changed, 137 insertions(+), 114 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
index f88da9c4d..4a065583d 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
@@ -116,6 +116,11 @@ public enum MetaField {
      */
     MYSQL_TYPE,
 
+    /**
+     * The table structure. It is only used for Oracle database
+     */
+    ORACLE_TYPE,
+
     /**
      * Primary key field name. Currently, it is used for MySQL database.
      */
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
index 71c37e4ae..6c2ac4d2d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
@@ -54,14 +54,6 @@ public interface Metadata {
             case OP_TS:
                 metadataKey = "op_ts";
                 break;
-            case DATA:
-            case DATA_BYTES:
-                metadataKey = "meta.data";
-                break;
-            case DATA_CANAL:
-            case DATA_BYTES_CANAL:
-                metadataKey = "meta.data_canal";
-                break;
 
             default:
                 throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
@@ -106,6 +98,9 @@ public interface Metadata {
             case MYSQL_TYPE:
                 metadataType = "MAP<STRING, STRING>";
                 break;
+            case ORACLE_TYPE:
+                metadataType = "MAP<STRING, STRING>";
+                break;
             case PK_NAMES:
                 metadataType = "ARRAY<STRING>";
                 break;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
index a1bad2523..594ccdd60 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
@@ -150,7 +150,58 @@ public class OracleExtractNode extends ExtractNode implements InlongMetric, Meta
     @Override
     public Set<MetaField> supportedMetaFields() {
         return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATABASE_NAME,
-                MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.DATA, MetaField.DATA_CANAL,
-                MetaField.DATA_BYTES, MetaField.DATA_BYTES_CANAL);
+                MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.OP_TYPE, MetaField.DATA, MetaField.DATA_BYTES,
+                MetaField.DATA_CANAL, MetaField.DATA_BYTES_CANAL, MetaField.IS_DDL, MetaField.TS,
+                MetaField.SQL_TYPE, MetaField.ORACLE_TYPE, MetaField.PK_NAMES);
     }
+
+    @Override
+    public String getMetadataKey(MetaField metaField) {
+        String metadataKey;
+        switch (metaField) {
+            case TABLE_NAME:
+                metadataKey = "meta.table_name";
+                break;
+            case DATABASE_NAME:
+                metadataKey = "meta.database_name";
+                break;
+            case SCHEMA_NAME:
+                metadataKey = "meta.schema_name";
+                break;
+            case OP_TS:
+                metadataKey = "meta.op_ts";
+                break;
+            case OP_TYPE:
+                metadataKey = "meta.op_type";
+                break;
+            case DATA:
+            case DATA_BYTES:
+                metadataKey = "meta.data";
+                break;
+            case DATA_CANAL:
+            case DATA_BYTES_CANAL:
+                metadataKey = "meta.data_canal";
+                break;
+            case IS_DDL:
+                metadataKey = "meta.is_ddl";
+                break;
+            case TS:
+                metadataKey = "meta.ts";
+                break;
+            case SQL_TYPE:
+                metadataKey = "meta.sql_type";
+                break;
+            case ORACLE_TYPE:
+                metadataKey = "meta.oracle_type";
+                break;
+            case PK_NAMES:
+                metadataKey = "meta.pk_names";
+                break;
+            default:
+                throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+                        this.getClass().getSimpleName(), metaField));
+        }
+        return metadataKey;
+    }
+
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java
index 98394467b..3ddd10c85 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java
@@ -50,14 +50,20 @@ public class OracleExtractNodeTest extends SerializeBaseTest<OracleExtractNode>
     public void testMetaFields() {
         Map<MetaField, String> formatMap = new HashMap<>();
         formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()");
-        formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'table_name' VIRTUAL");
-        formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'database_name' VIRTUAL");
-        formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL");
-        formatMap.put(MetaField.SCHEMA_NAME, "STRING METADATA FROM 'schema_name' VIRTUAL");
+        formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'meta.table_name' VIRTUAL");
+        formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'meta.database_name' VIRTUAL");
+        formatMap.put(MetaField.SCHEMA_NAME, "STRING METADATA FROM 'meta.schema_name' VIRTUAL");
+        formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'meta.op_ts' VIRTUAL");
+        formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'meta.op_type' VIRTUAL");
         formatMap.put(MetaField.DATA, "STRING METADATA FROM 'meta.data' VIRTUAL");
         formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'meta.data_canal' VIRTUAL");
         formatMap.put(MetaField.DATA_BYTES, "BYTES METADATA FROM 'meta.data' VIRTUAL");
         formatMap.put(MetaField.DATA_BYTES_CANAL, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL");
+        formatMap.put(MetaField.IS_DDL, "BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL");
+        formatMap.put(MetaField.TS, "TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL");
+        formatMap.put(MetaField.SQL_TYPE, "MAP<STRING, INT> METADATA FROM 'meta.sql_type' VIRTUAL");
+        formatMap.put(MetaField.ORACLE_TYPE, "MAP<STRING, STRING> METADATA FROM 'meta.oracle_type' VIRTUAL");
+        formatMap.put(MetaField.PK_NAMES, "ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL");
         OracleExtractNode node = getTestObject();
         boolean formatEquals = true;
         for (MetaField metaField : node.supportedMetaFields()) {
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
index d40758b45..1b48a5870 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
@@ -47,7 +47,9 @@ import org.apache.kafka.connect.source.SourceRecord;
 /** Defines the supported metadata columns for {@link OracleTableSource}. */
 public enum OracleReadableMetaData {
 
-    /** Name of the table that contain the row. */
+    /**
+     * Name of the table that contain the row.
+     */
     TABLE_NAME(
             "table_name",
             DataTypes.STRING().notNull(),
@@ -59,7 +61,10 @@ public enum OracleReadableMetaData {
                     return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY));
                 }
             }),
-    /** Name of the schema that contain the row. */
+
+    /**
+     * Name of the schema that contain the row.
+     */
     SCHEMA_NAME(
             "schema_name",
             DataTypes.STRING().notNull(),
@@ -72,7 +77,9 @@ public enum OracleReadableMetaData {
                 }
             }),
 
-    /** Name of the database that contain the row. */
+    /**
+     * Name of the database that contain the row.
+     */
     DATABASE_NAME(
             "database_name",
             DataTypes.STRING().notNull(),
@@ -86,72 +93,50 @@ public enum OracleReadableMetaData {
             }),
 
     /**
-     * It indicates the time that the change was made in the database. If the record is read from
-     * snapshot of the table instead of the change stream, the value is always 0.
+     * It indicates the time that the change was made in the database.
      */
     OP_TS(
             "op_ts",
-            DataTypes.TIMESTAMP_LTZ(3).notNull(),
+            DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
             new MetadataConverter() {
                 private static final long serialVersionUID = 1L;
 
                 @Override
                 public Object read(SourceRecord record) {
                     Struct messageStruct = (Struct) record.value();
-                    Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+                    Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
                     return TimestampData.fromEpochMillis(
                             (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
                 }
             }),
 
-    DATA(
-            "meta.data",
-            DataTypes.STRING(),
-            new MetadataConverter() {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                public Object read(SourceRecord record) {
-                    return null;
-                }
-
-                @Override
-                public Object read(SourceRecord record,
-                        @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
-                    return getCanalData(record, tableSchema, (GenericRowData) rowData);
-                }
-            }),
-
-    DATA_CANAL(
-            "meta.data_canal",
-            DataTypes.STRING(),
+    /**
+     * Name of the table that contain the row.
+     */
+    META_TABLE_NAME(
+            "meta.table_name",
+            DataTypes.STRING().notNull(),
             new MetadataConverter() {
                 private static final long serialVersionUID = 1L;
 
                 @Override
                 public Object read(SourceRecord record) {
-                    return null;
-                }
-
-                @Override
-                public Object read(SourceRecord record,
-                        @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
-                    return getCanalData(record, tableSchema, (GenericRowData) rowData);
+                    return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY));
                 }
             }),
 
     /**
-     * Name of the table that contain the row. .
+     * Name of the schema that contain the row.
      */
-    META_TABLE_NAME(
-            "meta.table_name",
+    META_SCHEMA_NAME(
+            "meta.schema_name",
             DataTypes.STRING().notNull(),
             new MetadataConverter() {
                 private static final long serialVersionUID = 1L;
 
                 @Override
                 public Object read(SourceRecord record) {
-                    return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY));
+                    return StringData.fromString(getMetaData(record, AbstractSourceInfo.SCHEMA_NAME_KEY));
                 }
             }),
 
@@ -171,8 +156,7 @@ public enum OracleReadableMetaData {
             }),
 
     /**
-     * It indicates the time that the change was made in the database. If the record is read from
-     * snapshot of the table instead of the binlog, the value is always 0.
+     * It indicates the time that the change was made in the database.
      */
     META_OP_TS(
             "meta.op_ts",
@@ -189,79 +173,74 @@ public enum OracleReadableMetaData {
                 }
             }),
 
-    /**
-     * Operation type, INSERT/UPDATE/DELETE.
-     */
-    OP_TYPE(
-            "meta.op_type",
-            DataTypes.STRING().notNull(),
+    DATA(
+            "meta.data",
+            DataTypes.STRING(),
             new MetadataConverter() {
                 private static final long serialVersionUID = 1L;
 
                 @Override
                 public Object read(SourceRecord record) {
-                    return StringData.fromString(getOpType(record));
+                    return null;
+                }
+
+                @Override
+                public Object read(SourceRecord record,
+                        @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+                    return getCanalData(record, tableSchema, (GenericRowData) rowData);
                 }
             }),
 
-    /**
-     * Not important, a simple increment counter.
-     */
-    BATCH_ID(
-            "meta.batch_id",
-            DataTypes.BIGINT().nullable(),
+    DATA_CANAL(
+            "meta.data_canal",
+            DataTypes.STRING(),
             new MetadataConverter() {
                 private static final long serialVersionUID = 1L;
 
-                private long id = 0;
-
                 @Override
                 public Object read(SourceRecord record) {
-                    return id++;
+                    return null;
+                }
+
+                @Override
+                public Object read(SourceRecord record,
+                        @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+                    return getCanalData(record, tableSchema, (GenericRowData) rowData);
                 }
             }),
 
     /**
-     * Source does not emit ddl data.
+     * Operation type, INSERT/UPDATE/DELETE.
      */
-    IS_DDL(
-            "meta.is_ddl",
-            DataTypes.BOOLEAN().notNull(),
+    OP_TYPE(
+            "meta.op_type",
+            DataTypes.STRING().notNull(),
             new MetadataConverter() {
                 private static final long serialVersionUID = 1L;
 
                 @Override
                 public Object read(SourceRecord record) {
-                    return false;
+                    return StringData.fromString(getOpType(record));
                 }
             }),
 
     /**
-     * The update-before data for UPDATE record.
+     * Source does not emit ddl data.
      */
-    OLD(
-            "meta.update_before",
-            DataTypes.ARRAY(
-                            DataTypes.MAP(
-                                            DataTypes.STRING().nullable(),
-                                            DataTypes.STRING().nullable())
-                                    .nullable())
-                    .nullable(),
+    IS_DDL(
+            "meta.is_ddl",
+            DataTypes.BOOLEAN().notNull(),
             new MetadataConverter() {
                 private static final long serialVersionUID = 1L;
 
                 @Override
                 public Object read(SourceRecord record) {
-                    final Envelope.Operation op = Envelope.operationFor(record);
-                    if (op != Envelope.Operation.UPDATE) {
-                        return null;
-                    }
-                    return record;
+                    return false;
                 }
             }),
 
-    MYSQL_TYPE(
-            "meta.mysql_type",
+    ORACLE_TYPE(
+            "meta.oracle_type",
             DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
             new MetadataConverter() {
                 private static final long serialVersionUID = 1L;
@@ -274,24 +253,11 @@ public enum OracleReadableMetaData {
                 @Override
                 public Object read(
                         SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
-                    if (tableSchema == null) {
+                    Map<String, String> oracleType = getOracleType(tableSchema);
+                    if (oracleType == null) {
                         return null;
                     }
-                    Map<StringData, StringData> mysqlType = new HashMap<>();
-                    final Table table = tableSchema.getTable();
-                    table.columns()
-                            .forEach(
-                                    column -> {
-                                        mysqlType.put(
-                                                StringData.fromString(column.name()),
-                                                StringData.fromString(
-                                                        String.format(
-                                                                "%s(%d)",
-                                                                column.typeName(),
-                                                                column.length())));
-                                    });
-
-                    return new GenericMapData(mysqlType);
+                    return new GenericMapData(oracleType);
                 }
             }),
 
@@ -348,17 +314,17 @@ public enum OracleReadableMetaData {
                     if (tableSchema == null) {
                         return null;
                     }
-                    Map<StringData, Integer> mysqlType = new HashMap<>();
+                    Map<StringData, Integer> sqlType = new HashMap<>();
                     final Table table = tableSchema.getTable();
                     table.columns()
                             .forEach(
                                     column -> {
-                                        mysqlType.put(
+                                        sqlType.put(
                                                 StringData.fromString(column.name()),
                                                 column.jdbcType());
                                     });
 
-                    return new GenericMapData(mysqlType);
+                    return new GenericMapData(sqlType);
                 }
             }),
 
@@ -479,16 +445,16 @@ public enum OracleReadableMetaData {
         if (tableSchema == null) {
             return null;
         }
-        Map<String, Integer> mysqlType = new LinkedHashMap<>();
+        Map<String, Integer> sqlType = new LinkedHashMap<>();
         final Table table = tableSchema.getTable();
         table.columns()
                 .forEach(
                         column -> {
-                            mysqlType.put(
+                            sqlType.put(
                                     column.name(),
                                     column.jdbcType());
                         });
-        return mysqlType;
+        return sqlType;
     }
 
     private static String getMetaData(SourceRecord record, String tableNameKey) {