You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/24 02:32:50 UTC

[inlong] branch master updated: [INLONG-6256][Sort] Support debezium-json format with schema parse for DebeziumJsonDynamicSchemaFormat (#6259)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 75d0a5d31 [INLONG-6256][Sort] Support debezium-json format with schema parse for DebeziumJsonDynamicSchemaFormat (#6259)
75d0a5d31 is described below

commit 75d0a5d3142e6583f63f08a00826885933533a21
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Mon Oct 24 10:32:44 2022 +0800

    [INLONG-6256][Sort] Support debezium-json format with schema parse for DebeziumJsonDynamicSchemaFormat (#6259)
---
 .../format/DebeziumJsonDynamicSchemaFormat.java    | 215 ++++++++++++----
 .../DebeziumJsonDynamicSchemaFormatTest.java       |   7 +
 ...eziumJsonDynamicSchemaFormatWithSchemaTest.java | 275 +++++++++++++++++++++
 3 files changed, 448 insertions(+), 49 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
index f3bc603f1..8c9fe2d66 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
@@ -18,13 +18,25 @@
 package org.apache.inlong.sort.base.format;
 
 import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.RowKind;
-
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Debezium json dynamic format
@@ -33,17 +45,46 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
 
     private static final String IDENTIFIER = "debezium-json";
     private static final String DDL_FLAG = "ddl";
-    private static final String SCHEMA = "sqlType";
+    private static final String SCHEMA = "schema";
+    private static final String SQL_TYPE = "sqlType";
     private static final String AFTER = "after";
     private static final String BEFORE = "before";
     private static final String SOURCE = "source";
     private static final String PK_NAMES = "pkNames";
     private static final String OP_TYPE = "op";
-    private static final String OP_READ = "r"; // snapshot read
-    private static final String OP_CREATE = "c"; // insert
-    private static final String OP_UPDATE = "u"; // update
-    private static final String OP_DELETE = "d"; // delete
-
+    private static final String PAYLOAD = "payload";
+    private static final String FIELDS = "fields";
+    private static final String FIELD = "field";
+    private static final String TYPE = "type";
+    /**
+     * Snapshot read
+     */
+    private static final String OP_READ = "r";
+    /**
+     * Insert
+     */
+    private static final String OP_CREATE = "c";
+    /**
+     * Update
+     */
+    private static final String OP_UPDATE = "u";
+    /**
+     * Delete
+     */
+    private static final String OP_DELETE = "d";
+
+    private static final Map<String, LogicalType> DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING =
+            ImmutableMap.<String, LogicalType>builder()
+                    .put("BOOLEAN", new BooleanType())
+                    .put("INT8", new TinyIntType())
+                    .put("INT16", new SmallIntType())
+                    .put("INT32", new IntType())
+                    .put("INT64", new BigIntType())
+                    .put("FLOAT32", new FloatType())
+                    .put("FLOAT64", new DoubleType())
+                    .put("STRING", new VarCharType())
+                    .put("BYTES", new VarBinaryType())
+                    .build();
 
     private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat();
 
@@ -58,70 +99,138 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
 
     @Override
     public JsonNode getPhysicalData(JsonNode root) {
-        JsonNode physicalData = root.get(AFTER);
-        if (physicalData == null) {
-            physicalData = root.get(BEFORE);
+        JsonNode payload = root.get(PAYLOAD);
+        if (payload == null) {
+            JsonNode physicalData = root.get(AFTER);
+            if (physicalData == null) {
+                physicalData = root.get(BEFORE);
+            }
+            return physicalData;
         }
-        return physicalData;
+        return getPhysicalData(payload);
     }
 
     @Override
     public List<String> extractPrimaryKeyNames(JsonNode data) {
         List<String> pkNames = new ArrayList<>();
-        JsonNode sourceNode = data.get(SOURCE);
-        if (sourceNode == null) {
+        JsonNode payload = data.get(PAYLOAD);
+        if (payload == null) {
+            JsonNode sourceNode = data.get(SOURCE);
+            if (sourceNode == null) {
+                return pkNames;
+            }
+            JsonNode pkNamesNode = sourceNode.get(PK_NAMES);
+            if (pkNamesNode != null && pkNamesNode.isArray()) {
+                for (int i = 0; i < pkNamesNode.size(); i++) {
+                    pkNames.add(pkNamesNode.get(i).asText());
+                }
+            }
             return pkNames;
         }
-        JsonNode pkNamesNode = sourceNode.get(PK_NAMES);
-        if (pkNamesNode != null && pkNamesNode.isArray()) {
-            for (int i = 0; i < pkNamesNode.size(); i++) {
-                pkNames.add(pkNamesNode.get(i).asText());
-            }
+        return extractPrimaryKeyNames(payload);
+    }
+
+    @Override
+    public String parse(JsonNode rootNode, String pattern) throws IOException {
+        JsonNode payload = rootNode.get(PAYLOAD);
+        if (payload == null) {
+            return super.parse(rootNode, pattern);
         }
-        return pkNames;
+        return super.parse(payload, pattern);
     }
 
     @Override
     public boolean extractDDLFlag(JsonNode data) {
-        return data.has(DDL_FLAG) ? data.get(DDL_FLAG).asBoolean(false) : false;
+        JsonNode payload = data.get(PAYLOAD);
+        if (payload == null) {
+            return data.has(DDL_FLAG) && data.get(DDL_FLAG).asBoolean(false);
+        }
+        return extractDDLFlag(payload);
+    }
+
+    public RowType extractSchemaFromExtractInfo(JsonNode data, List<String> pkNames) {
+        JsonNode payload = data.get(PAYLOAD);
+        if (payload == null) {
+            JsonNode sourceNode = data.get(SOURCE);
+            if (sourceNode == null) {
+                throw new IllegalArgumentException(String.format("Error schema: %s.", data));
+            }
+            JsonNode schemaNode = sourceNode.get(SQL_TYPE);
+            if (schemaNode == null) {
+                throw new IllegalArgumentException(String.format("Error schema: %s.", data));
+            }
+            return super.extractSchemaNode(schemaNode, pkNames);
+        }
+        return extractSchemaFromExtractInfo(payload, pkNames);
     }
 
     @Override
     public RowType extractSchema(JsonNode data, List<String> pkNames) {
-        JsonNode schema = data.get(SCHEMA);
-        return extractSchemaNode(schema, pkNames);
+        // first get schema from 'sqlType', fallback to get it from 'schema'
+        try {
+            return extractSchemaFromExtractInfo(data, pkNames);
+        } catch (IllegalArgumentException e) {
+            JsonNode schema = data.get(SCHEMA);
+            for (JsonNode field : schema.get(FIELDS)) {
+                if (AFTER.equals(field.get(FIELD).asText())) {
+                    return extractSchemaNode(field.get(FIELDS), pkNames);
+                }
+            }
+            throw new IllegalArgumentException(String.format("Error schema: %s.", schema));
+        }
     }
 
     @Override
-    public List<RowData> extractRowData(JsonNode data, RowType rowType) {
-        JsonNode opNode = data.get(OP_TYPE);
-        JsonNode dataBeforeNode = data.get(BEFORE);
-        JsonNode dataAfterNode = data.get(AFTER);
-        if (opNode == null || (dataBeforeNode == null && dataAfterNode == null)) {
-            throw new IllegalArgumentException(
-                    String.format("Error opNode: %s, or dataBeforeNode: %s, dataAfterNode",
-                            opNode, dataBeforeNode, dataAfterNode));
+    public RowType extractSchemaNode(JsonNode schema, List<String> pkNames) {
+        List<RowType.RowField> fields = new ArrayList<>();
+        for (JsonNode field : schema) {
+            String name = field.get(FIELD).asText();
+            LogicalType type = debeziumType2FlinkType(field.get(TYPE).asText());
+            if (pkNames.contains(name)) {
+                type = type.copy(false);
+            }
+            fields.add(new RowType.RowField(name, type));
         }
-        List<RowData> rowDataList = new ArrayList<>();
-        JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType);
-
-        String op = data.get(OP_TYPE).asText();
-        if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
-            RowData rowData = (RowData) rowDataConverter.convert(dataAfterNode);
-            rowData.setRowKind(RowKind.INSERT);
-            rowDataList.add(rowData);
-        } else if (OP_UPDATE.equals(op)) {
-            RowData rowData = (RowData) rowDataConverter.convert(dataAfterNode);
-            rowData.setRowKind(RowKind.UPDATE_AFTER);
-            rowDataList.add(rowData);
-        } else if (OP_DELETE.equals(op)) {
-            RowData rowData = (RowData) rowDataConverter.convert(dataBeforeNode);
-            rowData.setRowKind(RowKind.DELETE);
-            rowDataList.add(rowData);
-        } else {
-            throw new IllegalArgumentException("Unsupported op_type: " + op);
+        return new RowType(fields);
+    }
+
+    @Override
+    public List<RowData> extractRowData(JsonNode data, RowType rowType) {
+        JsonNode payload = data.get(PAYLOAD);
+        if (payload == null) {
+            JsonNode opNode = data.get(OP_TYPE);
+            JsonNode dataBeforeNode = data.get(BEFORE);
+            JsonNode dataAfterNode = data.get(AFTER);
+            if (opNode == null || (dataBeforeNode == null && dataAfterNode == null)) {
+                throw new IllegalArgumentException(
+                        String.format("Error opNode: %s, or dataBeforeNode: %s, dataAfterNode: %s",
+                                opNode, dataBeforeNode, dataAfterNode));
+            }
+            List<RowData> rowDataList = new ArrayList<>();
+            JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType);
+
+            String op = data.get(OP_TYPE).asText();
+            if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
+                RowData rowData = (RowData) rowDataConverter.convert(dataAfterNode);
+                rowData.setRowKind(RowKind.INSERT);
+                rowDataList.add(rowData);
+            } else if (OP_UPDATE.equals(op)) {
+                RowData rowData = (RowData) rowDataConverter.convert(dataBeforeNode);
+                rowData.setRowKind(RowKind.UPDATE_BEFORE);
+                rowDataList.add(rowData);
+                rowData = (RowData) rowDataConverter.convert(dataAfterNode);
+                rowData.setRowKind(RowKind.UPDATE_AFTER);
+                rowDataList.add(rowData);
+            } else if (OP_DELETE.equals(op)) {
+                RowData rowData = (RowData) rowDataConverter.convert(dataBeforeNode);
+                rowData.setRowKind(RowKind.DELETE);
+                rowDataList.add(rowData);
+            } else {
+                throw new IllegalArgumentException("Unsupported op_type: " + op);
+            }
+            return rowDataList;
         }
-        return rowDataList;
+        return extractRowData(payload, rowType);
     }
 
     /**
@@ -133,4 +242,12 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
     public String identifier() {
         return IDENTIFIER;
     }
+
+    private LogicalType debeziumType2FlinkType(String debeziumType) {
+        if (DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.containsKey(debeziumType.toUpperCase())) {
+            return DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.get(debeziumType.toUpperCase());
+        } else {
+            throw new IllegalArgumentException("Unsupported debeziumType: " + debeziumType.toUpperCase());
+        }
+    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
index bbd51dac6..5e555a657 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
@@ -88,6 +88,13 @@ public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBase
         Assert.assertEquals(values, Arrays.asList("111", "scooter"));
     }
 
+    @Test
+    public void testExtractPhysicalData() throws IOException {
+        JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+                .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+        Assert.assertNotNull(((JsonDynamicSchemaFormat) getDynamicSchemaFormat()).getPhysicalData(rootNode));
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
new file mode 100644
index 000000000..fb268b9a7
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for {@link DebeziumJsonDynamicSchemaFormat}
+ */
+public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+
+    @Override
+    protected String getSource() {
+        return "{\n"
+                + "  \"schema\": { \n"
+                + "    \"type\": \"struct\",\n"
+                + "    \"fields\": [\n"
+                + "      {\n"
+                + "        \"type\": \"struct\",\n"
+                + "        \"fields\": [\n"
+                + "          {\n"
+                + "            \"type\": \"int32\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"id\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"first_name\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"last_name\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"email\"\n"
+                + "          }\n"
+                + "        ],\n"
+                + "        \"optional\": true,\n"
+                + "        \"name\": \"mysql-server-1.inventory.customers.Value\", \n"
+                + "        \"field\": \"before\"\n"
+                + "      },\n"
+                + "      {\n"
+                + "        \"type\": \"struct\",\n"
+                + "        \"fields\": [\n"
+                + "          {\n"
+                + "            \"type\": \"int32\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"id\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"first_name\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"last_name\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"email\"\n"
+                + "          }\n"
+                + "        ],\n"
+                + "        \"optional\": true,\n"
+                + "        \"name\": \"mysql-server-1.inventory.customers.Value\",\n"
+                + "        \"field\": \"after\"\n"
+                + "      },\n"
+                + "      {\n"
+                + "        \"type\": \"struct\",\n"
+                + "        \"fields\": [\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"version\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"connector\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"name\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"int64\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"ts_ms\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"boolean\",\n"
+                + "            \"optional\": true,\n"
+                + "            \"default\": false,\n"
+                + "            \"field\": \"snapshot\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"db\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": true,\n"
+                + "            \"field\": \"table\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"int64\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"server_id\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": true,\n"
+                + "            \"field\": \"gtid\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"file\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"int64\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"pos\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"int32\",\n"
+                + "            \"optional\": false,\n"
+                + "            \"field\": \"row\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"int64\",\n"
+                + "            \"optional\": true,\n"
+                + "            \"field\": \"thread\"\n"
+                + "          },\n"
+                + "          {\n"
+                + "            \"type\": \"string\",\n"
+                + "            \"optional\": true,\n"
+                + "            \"field\": \"query\"\n"
+                + "          }\n"
+                + "        ],\n"
+                + "        \"optional\": false,\n"
+                + "        \"name\": \"io.debezium.connector.mysql.Source\", \n"
+                + "        \"field\": \"source\"\n"
+                + "      },\n"
+                + "      {\n"
+                + "        \"type\": \"string\",\n"
+                + "        \"optional\": false,\n"
+                + "        \"field\": \"op\"\n"
+                + "      },\n"
+                + "      {\n"
+                + "        \"type\": \"int64\",\n"
+                + "        \"optional\": true,\n"
+                + "        \"field\": \"ts_ms\"\n"
+                + "      }\n"
+                + "    ],\n"
+                + "    \"optional\": false,\n"
+                + "    \"name\": \"mysql-server-1.inventory.customers.Envelope\" \n"
+                + "  },\n"
+                + "  \"payload\": { \n"
+                + "    \"op\": \"c\", \n"
+                + "    \"ts_ms\": 1465491411815, \n"
+                + "    \"before\": null, \n"
+                + "    \"after\": { \n"
+                + "      \"id\": 1004,\n"
+                + "      \"first_name\": \"Anne\",\n"
+                + "      \"last_name\": \"Kretchmar\",\n"
+                + "      \"email\": \"annek@noanswer.org\"\n"
+                + "    },\n"
+                + "    \"source\": { \n"
+                + "      \"version\": \"1.9.6.Final\",\n"
+                + "      \"connector\": \"mysql\",\n"
+                + "      \"name\": \"mysql-server-1\",\n"
+                + "      \"ts_ms\": 0,\n"
+                + "      \"pkNames\":[\"id\", \"first_name\"],"
+                + "      \"snapshot\": false,\n"
+                + "      \"db\": \"inventory\",\n"
+                + "      \"table\": \"customers\",\n"
+                + "      \"server_id\": 0,\n"
+                + "      \"gtid\": null,\n"
+                + "      \"file\": \"mysql-bin.000003\",\n"
+                + "      \"pos\": 154,\n"
+                + "      \"row\": 0,\n"
+                + "      \"thread\": 7,\n"
+                + "      \"query\": \"INSERT INTO customers (first_name, last_name, email)"
+                + " VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')\"\n"
+                + "    }\n"
+                + "  }\n"
+                + "}";
+    }
+
+    @Override
+    protected Map<String, String> getExpectedValues() {
+        Map<String, String> expectedValues = new HashMap<>();
+        expectedValues.put("${source.db}${source.table}", "inventorycustomers");
+        expectedValues.put("${source.db}_${source.table}", "inventory_customers");
+        expectedValues.put("prefix_${source.db}_${source.table}_suffix", "prefix_inventory_customers_suffix");
+        expectedValues.put("${ \t source.db \t }${ source.table }", "inventorycustomers");
+        expectedValues.put("${source.db}_${source.table}_${id}_${first_name}", "inventory_customers_1004_Anne");
+        return expectedValues;
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked"})
+    public void testExtractPrimaryKey() throws IOException {
+        JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+                .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+        List<String> primaryKeys = getDynamicSchemaFormat().extractPrimaryKeyNames(rootNode);
+        List<String> values = getDynamicSchemaFormat().extractValues(rootNode, primaryKeys.toArray(new String[]{}));
+        Assert.assertEquals(values, Arrays.asList("1004", "Anne"));
+    }
+
+    @Test
+    public void testExtractPhysicalData() throws IOException {
+        JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+                .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+        Assert.assertNotNull(((JsonDynamicSchemaFormat) getDynamicSchemaFormat()).getPhysicalData(rootNode));
+    }
+
+    @Test
+    public void testExtractRowType() throws IOException {
+        JsonNode rootNode = (JsonNode) getDynamicSchemaFormat()
+                .deserialize(getSource().getBytes(StandardCharsets.UTF_8));
+        String[] names = new String[]{"id", "first_name", "last_name", "email"};
+        LogicalType[] types = new LogicalType[]{
+                new IntType(false),
+                new VarCharType(false, 1),
+                new VarCharType(),
+                new VarCharType()
+        };
+        RowType rowType = RowType.of(true, types, names);
+        Assert.assertEquals(getDynamicSchemaFormat().extractSchema(rootNode), rowType);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @Override
+    protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
+        return DebeziumJsonDynamicSchemaFormat.getInstance();
+    }
+}