You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/08 07:02:53 UTC
[inlong] branch master updated: [INLONG-7543][Sort] Fix PostgreSQL connector output two data with the same UPDATE operation (#7544)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 459be730e [INLONG-7543][Sort] Fix PostgreSQL connector output two data with the same UPDATE operation (#7544)
459be730e is described below
commit 459be730e2f4f0062ab743e3bb0e7c11498df90b
Author: Liao Rui <li...@users.noreply.github.com>
AuthorDate: Wed Mar 8 15:02:46 2023 +0800
[INLONG-7543][Sort] Fix PostgreSQL connector output two data with the same UPDATE operation (#7544)
Co-authored-by: ryanrliao <ry...@tencent.com>
---
.../postgres/table/PostgreSQLReadableMetaData.java | 90 +++++++++++++++-------
1 file changed, 64 insertions(+), 26 deletions(-)
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
index b2dacf9ce..e035c4a93 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java
@@ -165,7 +165,7 @@ public enum PostgreSQLReadableMetaData {
.name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY)).sqlType(getSqlType(tableSchema))
.pkNames(getPkNames(tableSchema)).build();
DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source)
- .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record))
+ .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
.tableChange(tableSchema).build();
try {
@@ -373,9 +373,9 @@ public enum PostgreSQLReadableMetaData {
/**
* Generate a canal json message
*
- * @param record
- * @param tableSchema
- * @param rowData
+ * @param record source record
+ * @param tableSchema table schema
+ * @param rowData row data with canal json or debezium json
* @return
*/
private static StringData getCanalData(SourceRecord record, TableChange tableSchema, GenericRowData rowData) {
@@ -398,9 +398,18 @@ public enum PostgreSQLReadableMetaData {
List<Map<String, Object>> dataList = new ArrayList<>();
dataList.add(field);
- CanalJson canalJson = CanalJson.builder().data(dataList).database(databaseName).schema(schemaName).sql("")
- .es(opTs).isDdl(false).pkNames(getPkNames(tableSchema)).table(tableName).ts(ts).type(getOpType(record))
- .sqlType(getSqlType(tableSchema)).build();
+ CanalJson canalJson = CanalJson.builder()
+ .data(dataList)
+ .database(databaseName)
+ .schema(schemaName)
+ .sql("")
+ .es(opTs).isDdl(false)
+ .pkNames(getPkNames(tableSchema))
+ .table(tableName)
+ .ts(ts)
+ .type(getCanalOpType(rowData))
+ .sqlType(getSqlType(tableSchema))
+ .build();
try {
ObjectMapper objectMapper = new ObjectMapper();
return StringData.fromString(objectMapper.writeValueAsString(canalJson));
@@ -426,8 +435,8 @@ public enum PostgreSQLReadableMetaData {
/**
* convert debezium operation to canal-json operation type
*
- * @param record
- * @return
+ * @param record source record
+ * @return source table DML operation type
*/
private static String getOpType(SourceRecord record) {
String opType;
@@ -445,8 +454,8 @@ public enum PostgreSQLReadableMetaData {
/**
* get primary key names
*
- * @param tableSchema
- * @return
+ * @param tableSchema table schema
+ * @return primary key column names
*/
private static List<String> getPkNames(@Nullable TableChange tableSchema) {
if (tableSchema == null) {
@@ -458,8 +467,8 @@ public enum PostgreSQLReadableMetaData {
/**
* get a map about column name and type
*
- * @param tableSchema
- * @return
+ * @param tableSchema table schema
+ * @return map of field name and field type
*/
public static Map<String, Integer> getSqlType(@Nullable TableChange tableSchema) {
if (tableSchema == null) {
@@ -473,21 +482,50 @@ public enum PostgreSQLReadableMetaData {
return postgresType;
}
+ /**
+ * convert canal operation to canal-json operation type
+ *
+ * @param record raw data with canal json format
+ * @return source table DML operation type
+ */
+ private static String getCanalOpType(GenericRowData record) {
+ String opType;
+ switch (record.getRowKind()) {
+ case DELETE:
+ case UPDATE_BEFORE:
+ opType = "DELETE";
+ break;
+ case INSERT:
+ case UPDATE_AFTER:
+ opType = "INSERT";
+ break;
+ default:
+ throw new IllegalStateException("the record only have states in DELETE, "
+ + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+ }
+ return opType;
+ }
+
/**
* convert debezium operation to debezium-json operation type
*
- * @param record
- * @return
+ * @param record raw data with debezium json format
+ * @return source table DML operation type
*/
- private static String getDebeziumOpType(SourceRecord record) {
+ private static String getDebeziumOpType(GenericRowData record) {
String opType;
- final Envelope.Operation op = Envelope.operationFor(record);
- if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
- opType = "c";
- } else if (op == Envelope.Operation.DELETE) {
- opType = "d";
- } else {
- opType = "u";
+ switch (record.getRowKind()) {
+ case DELETE:
+ case UPDATE_BEFORE:
+ opType = "d";
+ break;
+ case INSERT:
+ case UPDATE_AFTER:
+ opType = "c";
+ break;
+ default:
+ throw new IllegalStateException("the record only have states in DELETE, "
+ + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
}
return opType;
}
@@ -495,9 +533,9 @@ public enum PostgreSQLReadableMetaData {
/**
* get meta info from debezium-json data stream
*
- * @param record
- * @param tableNameKey
- * @return
+ * @param record source record
+ * @param tableNameKey key in the source record
+ * @return value of the key in the source record
*/
private static String getMetaData(SourceRecord record, String tableNameKey) {
Struct messageStruct = (Struct) record.value();