You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/07 03:24:39 UTC
[inlong] 01/01: [INLONG-6402][Sort] Modify the metadata field of Oracle connector (#6404)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 622676ed6024183188369ef9856f2f0b12958d7b
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Mon Nov 7 11:17:39 2022 +0800
[INLONG-6402][Sort] Modify the metadata field of Oracle connector (#6404)
---
.../org/apache/inlong/common/enums/MetaField.java | 5 +
.../org/apache/inlong/sort/protocol/Metadata.java | 11 +-
.../protocol/node/extract/OracleExtractNode.java | 55 ++++++-
.../node/extract/OracleExtractNodeTest.java | 14 +-
.../cdc/oracle/table/OracleReadableMetaData.java | 166 ++++++++-------------
5 files changed, 137 insertions(+), 114 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
index f88da9c4d..4a065583d 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
@@ -116,6 +116,11 @@ public enum MetaField {
*/
MYSQL_TYPE,
+ /**
+ * The table structure. It is only used for Oracle database
+ */
+ ORACLE_TYPE,
+
/**
* Primary key field name. Currently, it is used for MySQL database.
*/
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
index 71c37e4ae..6c2ac4d2d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
@@ -54,14 +54,6 @@ public interface Metadata {
case OP_TS:
metadataKey = "op_ts";
break;
- case DATA:
- case DATA_BYTES:
- metadataKey = "meta.data";
- break;
- case DATA_CANAL:
- case DATA_BYTES_CANAL:
- metadataKey = "meta.data_canal";
- break;
default:
throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
@@ -106,6 +98,9 @@ public interface Metadata {
case MYSQL_TYPE:
metadataType = "MAP<STRING, STRING>";
break;
+ case ORACLE_TYPE:
+ metadataType = "MAP<STRING, STRING>";
+ break;
case PK_NAMES:
metadataType = "ARRAY<STRING>";
break;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
index a1bad2523..594ccdd60 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNode.java
@@ -150,7 +150,58 @@ public class OracleExtractNode extends ExtractNode implements InlongMetric, Meta
@Override
public Set<MetaField> supportedMetaFields() {
return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATABASE_NAME,
- MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.DATA, MetaField.DATA_CANAL,
- MetaField.DATA_BYTES, MetaField.DATA_BYTES_CANAL);
+ MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.OP_TYPE, MetaField.DATA, MetaField.DATA_BYTES,
+ MetaField.DATA_CANAL, MetaField.DATA_BYTES_CANAL, MetaField.IS_DDL, MetaField.TS,
+ MetaField.SQL_TYPE, MetaField.ORACLE_TYPE, MetaField.PK_NAMES);
}
+
+ @Override
+ public String getMetadataKey(MetaField metaField) {
+ String metadataKey;
+ switch (metaField) {
+ case TABLE_NAME:
+ metadataKey = "meta.table_name";
+ break;
+ case DATABASE_NAME:
+ metadataKey = "meta.database_name";
+ break;
+ case SCHEMA_NAME:
+ metadataKey = "meta.schema_name";
+ break;
+ case OP_TS:
+ metadataKey = "meta.op_ts";
+ break;
+ case OP_TYPE:
+ metadataKey = "meta.op_type";
+ break;
+ case DATA:
+ case DATA_BYTES:
+ metadataKey = "meta.data";
+ break;
+ case DATA_CANAL:
+ case DATA_BYTES_CANAL:
+ metadataKey = "meta.data_canal";
+ break;
+ case IS_DDL:
+ metadataKey = "meta.is_ddl";
+ break;
+ case TS:
+ metadataKey = "meta.ts";
+ break;
+ case SQL_TYPE:
+ metadataKey = "meta.sql_type";
+ break;
+ case ORACLE_TYPE:
+ metadataKey = "meta.oracle_type";
+ break;
+ case PK_NAMES:
+ metadataKey = "meta.pk_names";
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+ this.getClass().getSimpleName(), metaField));
+ }
+ return metadataKey;
+ }
+
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java
index 98394467b..3ddd10c85 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/OracleExtractNodeTest.java
@@ -50,14 +50,20 @@ public class OracleExtractNodeTest extends SerializeBaseTest<OracleExtractNode>
public void testMetaFields() {
Map<MetaField, String> formatMap = new HashMap<>();
formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()");
- formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'table_name' VIRTUAL");
- formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'database_name' VIRTUAL");
- formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL");
- formatMap.put(MetaField.SCHEMA_NAME, "STRING METADATA FROM 'schema_name' VIRTUAL");
+ formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'meta.table_name' VIRTUAL");
+ formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'meta.database_name' VIRTUAL");
+ formatMap.put(MetaField.SCHEMA_NAME, "STRING METADATA FROM 'meta.schema_name' VIRTUAL");
+ formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'meta.op_ts' VIRTUAL");
+ formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'meta.op_type' VIRTUAL");
formatMap.put(MetaField.DATA, "STRING METADATA FROM 'meta.data' VIRTUAL");
formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'meta.data_canal' VIRTUAL");
formatMap.put(MetaField.DATA_BYTES, "BYTES METADATA FROM 'meta.data' VIRTUAL");
formatMap.put(MetaField.DATA_BYTES_CANAL, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL");
+ formatMap.put(MetaField.IS_DDL, "BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL");
+ formatMap.put(MetaField.TS, "TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL");
+ formatMap.put(MetaField.SQL_TYPE, "MAP<STRING, INT> METADATA FROM 'meta.sql_type' VIRTUAL");
+ formatMap.put(MetaField.ORACLE_TYPE, "MAP<STRING, STRING> METADATA FROM 'meta.oracle_type' VIRTUAL");
+ formatMap.put(MetaField.PK_NAMES, "ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL");
OracleExtractNode node = getTestObject();
boolean formatEquals = true;
for (MetaField metaField : node.supportedMetaFields()) {
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
index d40758b45..1b48a5870 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
@@ -47,7 +47,9 @@ import org.apache.kafka.connect.source.SourceRecord;
/** Defines the supported metadata columns for {@link OracleTableSource}. */
public enum OracleReadableMetaData {
- /** Name of the table that contain the row. */
+ /**
+ * Name of the table that contain the row.
+ */
TABLE_NAME(
"table_name",
DataTypes.STRING().notNull(),
@@ -59,7 +61,10 @@ public enum OracleReadableMetaData {
return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY));
}
}),
- /** Name of the schema that contain the row. */
+
+ /**
+ * Name of the schema that contain the row.
+ */
SCHEMA_NAME(
"schema_name",
DataTypes.STRING().notNull(),
@@ -72,7 +77,9 @@ public enum OracleReadableMetaData {
}
}),
- /** Name of the database that contain the row. */
+ /**
+ * Name of the database that contain the row.
+ */
DATABASE_NAME(
"database_name",
DataTypes.STRING().notNull(),
@@ -86,72 +93,50 @@ public enum OracleReadableMetaData {
}),
/**
- * It indicates the time that the change was made in the database. If the record is read from
- * snapshot of the table instead of the change stream, the value is always 0.
+ * It indicates the time that the change was made in the database.
*/
OP_TS(
"op_ts",
- DataTypes.TIMESTAMP_LTZ(3).notNull(),
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+ Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
}),
- DATA(
- "meta.data",
- DataTypes.STRING(),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object read(SourceRecord record) {
- return null;
- }
-
- @Override
- public Object read(SourceRecord record,
- @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
- return getCanalData(record, tableSchema, (GenericRowData) rowData);
- }
- }),
-
- DATA_CANAL(
- "meta.data_canal",
- DataTypes.STRING(),
+ /**
+ * Name of the table that contain the row.
+ */
+ META_TABLE_NAME(
+ "meta.table_name",
+ DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
- return null;
- }
-
- @Override
- public Object read(SourceRecord record,
- @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
- return getCanalData(record, tableSchema, (GenericRowData) rowData);
+ return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY));
}
}),
/**
- * Name of the table that contain the row. .
+ * Name of the schema that contain the row.
*/
- META_TABLE_NAME(
- "meta.table_name",
+ META_SCHEMA_NAME(
+ "meta.schema_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
- return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY));
+ return StringData.fromString(getMetaData(record, AbstractSourceInfo.SCHEMA_NAME_KEY));
}
}),
@@ -171,8 +156,7 @@ public enum OracleReadableMetaData {
}),
/**
- * It indicates the time that the change was made in the database. If the record is read from
- * snapshot of the table instead of the binlog, the value is always 0.
+ * It indicates the time that the change was made in the database.
*/
META_OP_TS(
"meta.op_ts",
@@ -189,79 +173,74 @@ public enum OracleReadableMetaData {
}
}),
- /**
- * Operation type, INSERT/UPDATE/DELETE.
- */
- OP_TYPE(
- "meta.op_type",
- DataTypes.STRING().notNull(),
+ DATA(
+ "meta.data",
+ DataTypes.STRING(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
- return StringData.fromString(getOpType(record));
+ return null;
+ }
+
+ @Override
+ public Object read(SourceRecord record,
+ @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+ return getCanalData(record, tableSchema, (GenericRowData) rowData);
}
}),
- /**
- * Not important, a simple increment counter.
- */
- BATCH_ID(
- "meta.batch_id",
- DataTypes.BIGINT().nullable(),
+ DATA_CANAL(
+ "meta.data_canal",
+ DataTypes.STRING(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
- private long id = 0;
-
@Override
public Object read(SourceRecord record) {
- return id++;
+ return null;
+ }
+
+ @Override
+ public Object read(SourceRecord record,
+ @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+ return getCanalData(record, tableSchema, (GenericRowData) rowData);
}
}),
/**
- * Source does not emit ddl data.
+ * Operation type, INSERT/UPDATE/DELETE.
*/
- IS_DDL(
- "meta.is_ddl",
- DataTypes.BOOLEAN().notNull(),
+ OP_TYPE(
+ "meta.op_type",
+ DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
- return false;
+ return StringData.fromString(getOpType(record));
}
}),
/**
- * The update-before data for UPDATE record.
+ * Source does not emit ddl data.
*/
- OLD(
- "meta.update_before",
- DataTypes.ARRAY(
- DataTypes.MAP(
- DataTypes.STRING().nullable(),
- DataTypes.STRING().nullable())
- .nullable())
- .nullable(),
+ IS_DDL(
+ "meta.is_ddl",
+ DataTypes.BOOLEAN().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
- final Envelope.Operation op = Envelope.operationFor(record);
- if (op != Envelope.Operation.UPDATE) {
- return null;
- }
- return record;
+ return false;
}
}),
- MYSQL_TYPE(
- "meta.mysql_type",
+ ORACLE_TYPE(
+ "meta.oracle_type",
DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@@ -274,24 +253,11 @@ public enum OracleReadableMetaData {
@Override
public Object read(
SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
- if (tableSchema == null) {
+ Map<String, String> oracleType = getOracleType(tableSchema);
+ if (oracleType == null) {
return null;
}
- Map<StringData, StringData> mysqlType = new HashMap<>();
- final Table table = tableSchema.getTable();
- table.columns()
- .forEach(
- column -> {
- mysqlType.put(
- StringData.fromString(column.name()),
- StringData.fromString(
- String.format(
- "%s(%d)",
- column.typeName(),
- column.length())));
- });
-
- return new GenericMapData(mysqlType);
+ return new GenericMapData(oracleType);
}
}),
@@ -348,17 +314,17 @@ public enum OracleReadableMetaData {
if (tableSchema == null) {
return null;
}
- Map<StringData, Integer> mysqlType = new HashMap<>();
+ Map<StringData, Integer> sqlType = new HashMap<>();
final Table table = tableSchema.getTable();
table.columns()
.forEach(
column -> {
- mysqlType.put(
+ sqlType.put(
StringData.fromString(column.name()),
column.jdbcType());
});
- return new GenericMapData(mysqlType);
+ return new GenericMapData(sqlType);
}
}),
@@ -479,16 +445,16 @@ public enum OracleReadableMetaData {
if (tableSchema == null) {
return null;
}
- Map<String, Integer> mysqlType = new LinkedHashMap<>();
+ Map<String, Integer> sqlType = new LinkedHashMap<>();
final Table table = tableSchema.getTable();
table.columns()
.forEach(
column -> {
- mysqlType.put(
+ sqlType.put(
column.name(),
column.jdbcType());
});
- return mysqlType;
+ return sqlType;
}
private static String getMetaData(SourceRecord record, String tableNameKey) {