You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "TaoZex (via GitHub)" <gi...@apache.org> on 2023/03/02 17:34:54 UTC

[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #4225: [Feature][formats][ogg] Support read ogg format message #4201

TaoZex commented on code in PR #4225:
URL: https://github.com/apache/incubator-seatunnel/pull/4225#discussion_r1123476979


##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -106,8 +106,7 @@ Kafka distinguishes different transactions by different transactionId. This para
 
 ### format
 
-Data format. The default format is json. Optional text format. The default field separator is ",".
-If you customize the delimiter, add the "field_delimiter" option.
+If you use ogg format, please refer to [ogg-json](../formats/ogg-json.md) for details.

Review Comment:
   Do not delete previous documents, users will wonder how to use another format.



##########
docs/en/connector-v2/source/kafka.md:
##########
@@ -72,8 +72,7 @@ The structure of the data, including field names and field types.
 
 ## format
 
-Data format. The default format is json. Optional text format. The default field separator is ", ".
-If you customize the delimiter, add the "field_delimiter" option.
+If you use ogg format, please refer to [ogg-json](../formats/ogg-json.md) for details.

Review Comment:
   Same as above.



##########
seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.ogg;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+import static java.lang.String.format;
+
+public class OggJsonDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_TYPE = "op_type";
+
+    private static final String FIELD_DATABASE_TABLE = "table";
+
+    private static final String DATA_BEFORE = "before"; // BEFORE
+
+    private static final String DATA_AFTER = "after"; // AFTER
+
+    private static final String OP_INSERT = "I"; // INSERT
+
+    private static final String OP_UPDATE = "U"; // UPDATE
+
+    private static final String OP_DELETE = "D"; // DELETE
+
+    private static final String REPLICA_IDENTITY_EXCEPTION =
+            "The \"before\" field of %s message is null, "
+                    + "if you are using Ogg Postgres Connector, "
+                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
+
+    private String database;
+
+    private String table;
+
+    /** Names of fields. */
+    private final String[] fieldNames;
+
+    /** Number of fields. */
+    private final int fieldCount;
+
+    private boolean ignoreParseErrors;
+
+    /** Pattern of the specific database. */
+    private final Pattern databasePattern;
+
+    /** Pattern of the specific table. */
+    private final Pattern tablePattern;
+
+    private final JsonDeserializationSchema jsonDeserializer;
+
+    private final SeaTunnelRowType physicalRowType;
+
+    public OggJsonDeserializationSchema(
+            SeaTunnelRowType physicalRowType,
+            String database,
+            String table,
+            boolean ignoreParseErrors) {
+        this.physicalRowType = physicalRowType;
+        final SeaTunnelRowType jsonRowType = createJsonRowType(physicalRowType);
+        this.jsonDeserializer =
+                new JsonDeserializationSchema(false, ignoreParseErrors, jsonRowType);
+        this.database = database;
+        this.table = table;
+        this.fieldNames = physicalRowType.getFieldNames();
+        this.fieldCount = physicalRowType.getTotalFields();
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.databasePattern = database == null ? null : Pattern.compile(database);
+        this.tablePattern = table == null ? null : Pattern.compile(table);
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException();

Review Comment:
   Add a note for exception.



##########
docs/en/connector-v2/formats/ogg-json.md:
##########
@@ -0,0 +1,93 @@
+# Ogg Format
+
+[Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a ogg) is a managed service providing a real-time data mesh platform, which uses replication to keep data highly available, and enabling real-time analysis. Customers can design, execute, and monitor their data replication and stream data processing solutions without the need to allocate or manage compute environments. Ogg provides a format schema for changelog and supports to serialize messages using JSON.
+
+Seatunnel supports to interpret Ogg JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as
+
+        synchronizing incremental data from databases to other systems
+        auditing logs
+        real-time materialized views on databases
+        temporal join changing history of a database table and so on.
+
+Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel as Ogg JSON messages, and emit to storage like Kafka. However, currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Ogg messages.
+
+# Format Options
+
+|            option            | default | required |                                                                                                Description                                                                                                 |
+|------------------------------|---------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| format                       | (none)  | yes      | Specify what format to use, here should be '-json'.                                                                                                                                                        |
+| ogg-json.ignore-parse-errors | false   | no       | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.                                                                                                       |
+| ogg-json.database.include    | (none)  | no       | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
+| ogg-json.table.include       | (none)  | no       | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern.       |
+
+# How to use Ogg format
+
+## Kafka uses example
+
+Ogg provides a unified format for changelog, here is a simple example for an update operation captured from a Oracle products table:
+
+```bash
+{
+  "before": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.18
+  },
+  "after": {
+    "id": 111,
+    "name": "scooter",
+    "description": "Big 2-wheel scooter",
+    "weight": 5.15
+  },
+  "op_type": "U",
+  "op_ts": "2020-05-13 15:40:06.000000",
+  "current_ts": "2020-05-13 15:40:07.000000",
+  "primary_keys": [
+    "id"
+  ],
+  "pos": "00000000000000000000143",
+  "table": "PRODUCTS"
+}
+```
+
+Note: please refer to  documentation about the meaning of each fields.

Review Comment:
   ```suggestion
   Note: please refer to documentation about the meaning of each fields.
   ```



##########
seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/ogg/OggJsonSerDeSchemaTest.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.json.ogg;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OggJsonSerDeSchemaTest {
+
+    private static final SeaTunnelRowType PHYSICAL_DATA_TYPE =
+            new SeaTunnelRowType(
+                    new String[] {"id", "name", "description", "weight"},
+                    new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE, STRING_TYPE, FLOAT_TYPE});
+
+    @Test
+    public void testFilteringTables() throws Exception {
+        List<String> lines = readLines("ogg-data-filter-table.txt");
+        OggJsonDeserializationSchema deserializationSchema =
+                new OggJsonDeserializationSchema.Builder(PHYSICAL_DATA_TYPE)
+                        .setDatabase("^OG.*")
+                        .setTable("^TBL.*")
+                        .build();
+        runTest(lines, deserializationSchema);
+    }
+
+    @Test
+    public void testDeserializeNullRow() throws Exception {
+        final OggJsonDeserializationSchema deserializationSchema =
+                createOggJsonDeserializationSchema(null, null);
+        final SimpleCollector collector = new SimpleCollector();
+
+        deserializationSchema.deserialize(null, collector);
+        assertEquals(0, collector.list.size());
+    }
+
+    public void runTest(List<String> lines, OggJsonDeserializationSchema deserializationSchema)
+            throws IOException {
+        SimpleCollector collector = new SimpleCollector();
+        for (String line : lines) {
+            deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
+        }
+
+        List<String> expected =
+                Arrays.asList(
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[101, scooter, Small 2-wheel scooter, 3.14]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[102, car battery, 12V car battery, 8.1]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[104, hammer, 12oz carpenter's hammer, 0.75]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[105, hammer, 14oz carpenter's hammer, 0.875]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[107, rocks, box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[108, jacket, water resistent black wind breaker, 0.1]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[109, spare tire, 24 inch spare tire, 22.2]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[106, hammer, 18oz carpenter hammer, 1.0]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[107, rocks, box of assorted rocks, 5.3]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[107, rocks, box of assorted rocks, 5.1]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[110, jacket, water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=-1, kind=+I, fields=[111, scooter, Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[110, jacket, water resistent white wind breaker, 0.2]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[110, jacket, new water resistent white wind breaker, 0.5]}",
+                        "SeaTunnelRow{tableId=-1, kind=-U, fields=[111, scooter, Big 2-wheel scooter , 5.18]}",
+                        "SeaTunnelRow{tableId=-1, kind=+U, fields=[111, scooter, Big 2-wheel scooter , 5.17]}",
+                        "SeaTunnelRow{tableId=-1, kind=-D, fields=[111, scooter, Big 2-wheel scooter , 5.17]}");
+        List<String> actual =
+                collector.list.stream().map(Object::toString).collect(Collectors.toList());
+        assertEquals(expected, actual);
+
+        // test Serialization
+        OggJsonSerializationSchema serializationSchema =
+                new OggJsonSerializationSchema(PHYSICAL_DATA_TYPE);
+        List<String> result = new ArrayList<>();
+        for (SeaTunnelRow rowData : collector.list) {
+            result.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8));
+        }
+
+        List<String> expectedResult =
+                Arrays.asList(
+                        "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"type\":\"DELETE\"}",
+                        "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"type\":\"DELETE\"}",
+                        "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"type\":\"DELETE\"}",
+                        "{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"type\":\"DELETE\"}",
+                        "{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"type\":\"INSERT\"}",
+                        "{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"type\":\"DELETE\"}");
+
+        assertEquals(expectedResult, result);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Utilities
+    // --------------------------------------------------------------------------------------------
+
+    private OggJsonDeserializationSchema createOggJsonDeserializationSchema(
+            String database, String table) {
+        return OggJsonDeserializationSchema.builder(PHYSICAL_DATA_TYPE)
+                .setDatabase(database)
+                .setTable(table)
+                .setIgnoreParseErrors(false)
+                .build();
+    }
+
+    private static List<String> readLines(String resource) throws IOException {
+        final URL url = OggJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+        assert url != null;

Review Comment:
   ```suggestion
           Assertions.assertNotNull(url);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org