You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/24 02:32:50 UTC
[inlong] branch master updated: [INLONG-6256][Sort] Support debezium-json format with schema parse for DebeziumJsonDynamicSchemaFormat (#6259)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 75d0a5d31 [INLONG-6256][Sort] Support debezium-json format with schema parse for DebeziumJsonDynamicSchemaFormat (#6259)
75d0a5d31 is described below
commit 75d0a5d3142e6583f63f08a00826885933533a21
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Mon Oct 24 10:32:44 2022 +0800
[INLONG-6256][Sort] Support debezium-json format with schema parse for DebeziumJsonDynamicSchemaFormat (#6259)
---
.../format/DebeziumJsonDynamicSchemaFormat.java | 215 ++++++++++++----
.../DebeziumJsonDynamicSchemaFormatTest.java | 7 +
...eziumJsonDynamicSchemaFormatWithSchemaTest.java | 275 +++++++++++++++++++++
3 files changed, 448 insertions(+), 49 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
index f3bc603f1..8c9fe2d66 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
@@ -18,13 +18,25 @@
package org.apache.inlong.sort.base.format;
import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
-
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* Debezium json dynamic format
@@ -33,17 +45,46 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
private static final String IDENTIFIER = "debezium-json";
private static final String DDL_FLAG = "ddl";
- private static final String SCHEMA = "sqlType";
+ private static final String SCHEMA = "schema";
+ private static final String SQL_TYPE = "sqlType";
private static final String AFTER = "after";
private static final String BEFORE = "before";
private static final String SOURCE = "source";
private static final String PK_NAMES = "pkNames";
private static final String OP_TYPE = "op";
- private static final String OP_READ = "r"; // snapshot read
- private static final String OP_CREATE = "c"; // insert
- private static final String OP_UPDATE = "u"; // update
- private static final String OP_DELETE = "d"; // delete
-
+ private static final String PAYLOAD = "payload";
+ private static final String FIELDS = "fields";
+ private static final String FIELD = "field";
+ private static final String TYPE = "type";
+ /**
+ * Snapshot read
+ */
+ private static final String OP_READ = "r";
+ /**
+ * Insert
+ */
+ private static final String OP_CREATE = "c";
+ /**
+ * Update
+ */
+ private static final String OP_UPDATE = "u";
+ /**
+ * Delete
+ */
+ private static final String OP_DELETE = "d";
+
+ private static final Map<String, LogicalType> DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING =
+ ImmutableMap.<String, LogicalType>builder()
+ .put("BOOLEAN", new BooleanType())
+ .put("INT8", new TinyIntType())
+ .put("INT16", new SmallIntType())
+ .put("INT32", new IntType())
+ .put("INT64", new BigIntType())
+ .put("FLOAT32", new FloatType())
+ .put("FLOAT64", new DoubleType())
+ .put("STRING", new VarCharType())
+ .put("BYTES", new VarBinaryType())
+ .build();
private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat();
@@ -58,70 +99,138 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
@Override
public JsonNode getPhysicalData(JsonNode root) {
- JsonNode physicalData = root.get(AFTER);
- if (physicalData == null) {
- physicalData = root.get(BEFORE);
+ JsonNode payload = root.get(PAYLOAD);
+ if (payload == null) {
+ JsonNode physicalData = root.get(AFTER);
+ if (physicalData == null) {
+ physicalData = root.get(BEFORE);
+ }
+ return physicalData;
}
- return physicalData;
+ return getPhysicalData(payload);
}
@Override
public List<String> extractPrimaryKeyNames(JsonNode data) {
List<String> pkNames = new ArrayList<>();
- JsonNode sourceNode = data.get(SOURCE);
- if (sourceNode == null) {
+ JsonNode payload = data.get(PAYLOAD);
+ if (payload == null) {
+ JsonNode sourceNode = data.get(SOURCE);
+ if (sourceNode == null) {
+ return pkNames;
+ }
+ JsonNode pkNamesNode = sourceNode.get(PK_NAMES);
+ if (pkNamesNode != null && pkNamesNode.isArray()) {
+ for (int i = 0; i < pkNamesNode.size(); i++) {
+ pkNames.add(pkNamesNode.get(i).asText());
+ }
+ }
return pkNames;
}
- JsonNode pkNamesNode = sourceNode.get(PK_NAMES);
- if (pkNamesNode != null && pkNamesNode.isArray()) {
- for (int i = 0; i < pkNamesNode.size(); i++) {
- pkNames.add(pkNamesNode.get(i).asText());
- }
+ return extractPrimaryKeyNames(payload);
+ }
+
+ @Override
+ public String parse(JsonNode rootNode, String pattern) throws IOException {
+ JsonNode payload = rootNode.get(PAYLOAD);
+ if (payload == null) {
+ return super.parse(rootNode, pattern);
}
- return pkNames;
+ return super.parse(payload, pattern);
}
@Override
public boolean extractDDLFlag(JsonNode data) {
- return data.has(DDL_FLAG) ? data.get(DDL_FLAG).asBoolean(false) : false;
+ JsonNode payload = data.get(PAYLOAD);
+ if (payload == null) {
+ return data.has(DDL_FLAG) && data.get(DDL_FLAG).asBoolean(false);
+ }
+ return extractDDLFlag(payload);
+ }
+
+ public RowType extractSchemaFromExtractInfo(JsonNode data, List<String> pkNames) {
+ JsonNode payload = data.get(PAYLOAD);
+ if (payload == null) {
+ JsonNode sourceNode = data.get(SOURCE);
+ if (sourceNode == null) {
+ throw new IllegalArgumentException(String.format("Error schema: %s.", data));
+ }
+ JsonNode schemaNode = sourceNode.get(SQL_TYPE);
+ if (schemaNode == null) {
+ throw new IllegalArgumentException(String.format("Error schema: %s.", data));
+ }
+ return super.extractSchemaNode(schemaNode, pkNames);
+ }
+ return extractSchemaFromExtractInfo(payload, pkNames);
}
@Override
public RowType extractSchema(JsonNode data, List<String> pkNames) {
- JsonNode schema = data.get(SCHEMA);
- return extractSchemaNode(schema, pkNames);
+ // first get schema from 'sqlType', fallback to get it from 'schema'
+ try {
+ return extractSchemaFromExtractInfo(data, pkNames);
+ } catch (IllegalArgumentException e) {
+ JsonNode schema = data.get(SCHEMA);
+ for (JsonNode field : schema.get(FIELDS)) {
+ if (AFTER.equals(field.get(FIELD).asText())) {
+ return extractSchemaNode(field.get(FIELDS), pkNames);
+ }
+ }
+ throw new IllegalArgumentException(String.format("Error schema: %s.", schema));
+ }
}
@Override
- public List<RowData> extractRowData(JsonNode data, RowType rowType) {
- JsonNode opNode = data.get(OP_TYPE);
- JsonNode dataBeforeNode = data.get(BEFORE);
- JsonNode dataAfterNode = data.get(AFTER);
- if (opNode == null || (dataBeforeNode == null && dataAfterNode == null)) {
- throw new IllegalArgumentException(
- String.format("Error opNode: %s, or dataBeforeNode: %s, dataAfterNode",
- opNode, dataBeforeNode, dataAfterNode));
+ public RowType extractSchemaNode(JsonNode schema, List<String> pkNames) {
+ List<RowType.RowField> fields = new ArrayList<>();
+ for (JsonNode field : schema) {
+ String name = field.get(FIELD).asText();
+ LogicalType type = debeziumType2FlinkType(field.get(TYPE).asText());
+ if (pkNames.contains(name)) {
+ type = type.copy(false);
+ }
+ fields.add(new RowType.RowField(name, type));
}
- List<RowData> rowDataList = new ArrayList<>();
- JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType);
-
- String op = data.get(OP_TYPE).asText();
- if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
- RowData rowData = (RowData) rowDataConverter.convert(dataAfterNode);
- rowData.setRowKind(RowKind.INSERT);
- rowDataList.add(rowData);
- } else if (OP_UPDATE.equals(op)) {
- RowData rowData = (RowData) rowDataConverter.convert(dataAfterNode);
- rowData.setRowKind(RowKind.UPDATE_AFTER);
- rowDataList.add(rowData);
- } else if (OP_DELETE.equals(op)) {
- RowData rowData = (RowData) rowDataConverter.convert(dataBeforeNode);
- rowData.setRowKind(RowKind.DELETE);
- rowDataList.add(rowData);
- } else {
- throw new IllegalArgumentException("Unsupported op_type: " + op);
+ return new RowType(fields);
+ }
+
+ @Override
+ public List<RowData> extractRowData(JsonNode data, RowType rowType) {
+ JsonNode payload = data.get(PAYLOAD);
+ if (payload == null) {
+ JsonNode opNode = data.get(OP_TYPE);
+ JsonNode dataBeforeNode = data.get(BEFORE);
+ JsonNode dataAfterNode = data.get(AFTER);
+ if (opNode == null || (dataBeforeNode == null && dataAfterNode == null)) {
+ throw new IllegalArgumentException(
+ String.format("Error opNode: %s, or dataBeforeNode: %s, dataAfterNode: %s",
+ opNode, dataBeforeNode, dataAfterNode));
+ }
+ List<RowData> rowDataList = new ArrayList<>();
+ JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType);
+
+ String op = data.get(OP_TYPE).asText();
+ if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
+ RowData rowData = (RowData) rowDataConverter.convert(dataAfterNode);
+ rowData.setRowKind(RowKind.INSERT);
+ rowDataList.add(rowData);
+ } else if (OP_UPDATE.equals(op)) {
+ RowData rowData = (RowData) rowDataConverter.convert(dataBeforeNode);
+ rowData.setRowKind(RowKind.UPDATE_BEFORE);
+ rowDataList.add(rowData);
+ rowData = (RowData) rowDataConverter.convert(dataAfterNode);
+ rowData.setRowKind(RowKind.UPDATE_AFTER);
+ rowDataList.add(rowData);
+ } else if (OP_DELETE.equals(op)) {
+ RowData rowData = (RowData) rowDataConverter.convert(dataBeforeNode);
+ rowData.setRowKind(RowKind.DELETE);
+ rowDataList.add(rowData);
+ } else {
+ throw new IllegalArgumentException("Unsupported op_type: " + op);
+ }
+ return rowDataList;
}
- return rowDataList;
+ return extractRowData(payload, rowType);
}
/**
@@ -133,4 +242,12 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
public String identifier() {
return IDENTIFIER;
}
+
+ private LogicalType debeziumType2FlinkType(String debeziumType) {
+ if (DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.containsKey(debeziumType.toUpperCase())) {
+ return DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.get(debeziumType.toUpperCase());
+ } else {
+ throw new IllegalArgumentException("Unsupported debeziumType: " + debeziumType.toUpperCase());
+ }
+ }
}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
index bbd51dac6..5e555a657 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
@@ -88,6 +88,13 @@ public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBase
Assert.assertEquals(values, Arrays.asList("111", "scooter"));
}
+ @Test
+ public void testExtractPhysicalData() throws IOException {
+ JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+ .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+ Assert.assertNotNull(((JsonDynamicSchemaFormat) getDynamicSchemaFormat()).getPhysicalData(rootNode));
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
new file mode 100644
index 000000000..fb268b9a7
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for {@link DebeziumJsonDynamicSchemaFormat}
+ */
+public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+
+ @Override
+ protected String getSource() {
+ return "{\n"
+ + " \"schema\": { \n"
+ + " \"type\": \"struct\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"type\": \"struct\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"type\": \"int32\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"id\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"first_name\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"last_name\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"email\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"optional\": true,\n"
+ + " \"name\": \"mysql-server-1.inventory.customers.Value\", \n"
+ + " \"field\": \"before\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"struct\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"type\": \"int32\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"id\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"first_name\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"last_name\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"email\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"optional\": true,\n"
+ + " \"name\": \"mysql-server-1.inventory.customers.Value\",\n"
+ + " \"field\": \"after\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"struct\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"version\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"connector\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"name\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"int64\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"ts_ms\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"boolean\",\n"
+ + " \"optional\": true,\n"
+ + " \"default\": false,\n"
+ + " \"field\": \"snapshot\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"db\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": true,\n"
+ + " \"field\": \"table\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"int64\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"server_id\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": true,\n"
+ + " \"field\": \"gtid\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"file\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"int64\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"pos\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"int32\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"row\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"int64\",\n"
+ + " \"optional\": true,\n"
+ + " \"field\": \"thread\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": true,\n"
+ + " \"field\": \"query\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"optional\": false,\n"
+ + " \"name\": \"io.debezium.connector.mysql.Source\", \n"
+ + " \"field\": \"source\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"string\",\n"
+ + " \"optional\": false,\n"
+ + " \"field\": \"op\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"type\": \"int64\",\n"
+ + " \"optional\": true,\n"
+ + " \"field\": \"ts_ms\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"optional\": false,\n"
+ + " \"name\": \"mysql-server-1.inventory.customers.Envelope\" \n"
+ + " },\n"
+ + " \"payload\": { \n"
+ + " \"op\": \"c\", \n"
+ + " \"ts_ms\": 1465491411815, \n"
+ + " \"before\": null, \n"
+ + " \"after\": { \n"
+ + " \"id\": 1004,\n"
+ + " \"first_name\": \"Anne\",\n"
+ + " \"last_name\": \"Kretchmar\",\n"
+ + " \"email\": \"annek@noanswer.org\"\n"
+ + " },\n"
+ + " \"source\": { \n"
+ + " \"version\": \"1.9.6.Final\",\n"
+ + " \"connector\": \"mysql\",\n"
+ + " \"name\": \"mysql-server-1\",\n"
+ + " \"ts_ms\": 0,\n"
+ + " \"pkNames\":[\"id\", \"first_name\"],"
+ + " \"snapshot\": false,\n"
+ + " \"db\": \"inventory\",\n"
+ + " \"table\": \"customers\",\n"
+ + " \"server_id\": 0,\n"
+ + " \"gtid\": null,\n"
+ + " \"file\": \"mysql-bin.000003\",\n"
+ + " \"pos\": 154,\n"
+ + " \"row\": 0,\n"
+ + " \"thread\": 7,\n"
+ + " \"query\": \"INSERT INTO customers (first_name, last_name, email)"
+ + " VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')\"\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+ }
+
+ @Override
+ protected Map<String, String> getExpectedValues() {
+ Map<String, String> expectedValues = new HashMap<>();
+ expectedValues.put("${source.db}${source.table}", "inventorycustomers");
+ expectedValues.put("${source.db}_${source.table}", "inventory_customers");
+ expectedValues.put("prefix_${source.db}_${source.table}_suffix", "prefix_inventory_customers_suffix");
+ expectedValues.put("${ \t source.db \t }${ source.table }", "inventorycustomers");
+ expectedValues.put("${source.db}_${source.table}_${id}_${first_name}", "inventory_customers_1004_Anne");
+ return expectedValues;
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked"})
+ public void testExtractPrimaryKey() throws IOException {
+ JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+ .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+ List<String> primaryKeys = getDynamicSchemaFormat().extractPrimaryKeyNames(rootNode);
+ List<String> values = getDynamicSchemaFormat().extractValues(rootNode, primaryKeys.toArray(new String[]{}));
+ Assert.assertEquals(values, Arrays.asList("1004", "Anne"));
+ }
+
+ @Test
+ public void testExtractPhysicalData() throws IOException {
+ JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+ .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+ Assert.assertNotNull(((JsonDynamicSchemaFormat) getDynamicSchemaFormat()).getPhysicalData(rootNode));
+ }
+
+ @Test
+ public void testExtractRowType() throws IOException {
+ JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+ .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+ String[] names = new String[]{"id", "first_name", "last_name", "email"};
+ LogicalType[] types = new LogicalType[]{
+ new IntType(false),
+ new VarCharType(false, 1),
+ new VarCharType(),
+ new VarCharType()
+ };
+ RowType rowType = RowType.of(true, types, names);
+ Assert.assertEquals(getDynamicSchemaFormat().extractSchema(rootNode), rowType);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
+ return DebeziumJsonDynamicSchemaFormat.getInstance();
+ }
+}