You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/03/24 12:53:50 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2][Kafka] Kafka source supports data deserialization failure skipping (#4364)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e1ed22b15 [Feature][Connector-V2][Kafka] Kafka source supports data deserialization failure skipping (#4364)
e1ed22b15 is described below

commit e1ed22b1531eb11bbba9898f55d0568d386624d3
Author: dylandai <54...@qq.com>
AuthorDate: Fri Mar 24 20:53:42 2023 +0800

    [Feature][Connector-V2][Kafka] Kafka source supports data deserialization failure skipping (#4364)
    
    * [Feature][Connector-V2][Kafka]Kafka source supports data deserialization failure skipping #4361
    
    [Feature][Connector-V2][Kafka]Kafka source supports data deserialization failure skipping #4361
    
    * change log level and add changelog
    
    1. change log level
    2. add kafka source changeLog
    
    * add changelog
    
    add changelog
    
    * add e2e
    
    add e2e
    
    * add e2e case
    
    * [Feature][Connector-V2][Kafka] Fix code style
    
    * Update seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
    
    Co-authored-by: Tyrantlucifer <ty...@apache.org>
    
    * fix code-review
    
    * add e2e case for format_error_handle_way
    
    1、format_error_handle_way = fail
    The data is invalid, an exception will be thrown
    2、skip
    The data is invalid and will be skipped
    
    * unify exception
    
     unify exception
    
    * change e2e config
    
    * fix e2e test case
    
    * Update docs/en/connector-v2/source/kafka.md
    
    Co-authored-by: Eric <ga...@gmail.com>
    
    * Update kafka.md
    
    fix code-style
    
    ---------
    
    Co-authored-by: tyrantlucifer <ty...@gmail.com>
    Co-authored-by: Tyrantlucifer <ty...@apache.org>
    Co-authored-by: Eric <ga...@gmail.com>
---
 docs/en/connector-v2/source/kafka.md               |  8 ++++++
 .../api/serialization/DeserializationSchema.java   | 10 +++++--
 .../connectors/seatunnel/kafka/config/Config.java  |  9 ++++++
 .../kafka/config/MessageFormatErrorHandleWay.java  | 23 ++++++++++++++++
 .../seatunnel/kafka/source/KafkaSource.java        | 20 +++++++++++++-
 .../seatunnel/kafka/source/KafkaSourceReader.java  | 22 +++++++++++++--
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 32 ++++++++++++++++++++++
 ...e_format_error_handle_way_fail_to_console.conf} |  6 ++--
 ...e_format_error_handle_way_skip_to_console.conf} |  6 ++--
 .../resources/kafkasource_json_to_console.conf     |  1 +
 .../resources/kafkasource_text_to_console.conf     |  1 +
 11 files changed, 127 insertions(+), 11 deletions(-)

diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md
index 75d910341..9506712c2 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -28,6 +28,7 @@ Source connector for Apache Kafka.
 | common-options                      | config  | no       | -                        |
 | schema                              |         | no       | -                        |
 | format                              | String  | no       | json                     |
+| format_error_handle_way             | String  | no       | fail                     |
 | field_delimiter                     | String  | no       | ,                        |
 | start_mode                          | String  | no       | group_offsets            |
 | start_mode.offsets                  |         | no       |                          |
@@ -75,6 +76,12 @@ The structure of the data, including field names and field types.
 Data format. The default format is json. Optional text format. The default field separator is ", ".
 If you customize the delimiter, add the "field_delimiter" option.
 
+## format_error_handle_way
+
+The processing method of data format error. The default value is fail, and the optional value is (fail, skip).
+When fail is selected, data format error will block and an exception will be thrown.
+When skip is selected, data format error will skip this line data.
+
 ## field_delimiter
 
 Customize the field delimiter for data format.
@@ -218,4 +225,5 @@ source {
 - [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
 - [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
 - [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
+- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/incubator-seatunnel/pull/4364))
 
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
index d7f7abb1a..745e517a2 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
@@ -35,9 +35,13 @@ public interface DeserializationSchema<T> extends Serializable {
     T deserialize(byte[] message) throws IOException;
 
     default void deserialize(byte[] message, Collector<T> out) throws IOException {
-        T deserialize = deserialize(message);
-        if (deserialize != null) {
-            out.collect(deserialize);
+        try {
+            T deserialize = deserialize(message);
+            if (deserialize != null) {
+                out.collect(deserialize);
+            }
+        } catch (IOException e) {
+            throw new IOException(e);
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 60ce08038..ff051b96d 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -155,4 +155,13 @@ public class Config {
                     .defaultValue(-1L)
                     .withDescription(
                             "The interval for dynamically discovering topics and partitions.");
+
+    public static final Option<MessageFormatErrorHandleWay> MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION =
+            Options.key("format_error_handle_way")
+                    .enumType(MessageFormatErrorHandleWay.class)
+                    .defaultValue(MessageFormatErrorHandleWay.FAIL)
+                    .withDescription(
+                            "The processing method of data format error. The default value is fail, and the optional value is (fail, skip). "
+                                    + "When fail is selected, data format error will block and an exception will be thrown. "
+                                    + "When skip is selected, data format error will skip this line data.");
 }
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormatErrorHandleWay.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormatErrorHandleWay.java
new file mode 100644
index 000000000..bd61481f7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormatErrorHandleWay.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+public enum MessageFormatErrorHandleWay {
+    FAIL,
+    SKIP,
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index e760dbe6e..741d75216 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -41,6 +41,7 @@ import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -66,6 +67,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIEL
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
@@ -83,6 +85,8 @@ public class KafkaSource
     private SeaTunnelRowType typeInfo;
     private JobContext jobContext;
     private long discoveryIntervalMillis = KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();
+    private MessageFormatErrorHandleWay messageFormatErrorHandleWay =
+            MessageFormatErrorHandleWay.FAIL;
 
     @Override
     public Boundedness getBoundedness() {
@@ -186,6 +190,19 @@ public class KafkaSource
                                     this.metadata.getProperties().put(key, value.unwrapped()));
         }
 
+        if (config.hasPath(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
+            MessageFormatErrorHandleWay formatErrorWayOption =
+                    ReadonlyConfig.fromConfig(config).get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
+            switch (formatErrorWayOption) {
+                case FAIL:
+                case SKIP:
+                    this.messageFormatErrorHandleWay = formatErrorWayOption;
+                    break;
+                default:
+                    break;
+            }
+        }
+
         setDeserialization(config);
     }
 
@@ -197,7 +214,8 @@ public class KafkaSource
     @Override
     public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-        return new KafkaSourceReader(this.metadata, deserializationSchema, readerContext);
+        return new KafkaSourceReader(
+                this.metadata, deserializationSchema, readerContext, messageFormatErrorHandleWay);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 3e602aa7f..07fe71a60 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
 import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 
@@ -62,6 +63,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
     private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
     private final ExecutorService executorService;
     private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
 
     private final LinkedBlockingQueue<KafkaSourceSplit> pendingPartitionsQueue;
 
@@ -70,9 +72,11 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
     KafkaSourceReader(
             ConsumerMetadata metadata,
             DeserializationSchema<SeaTunnelRow> deserializationSchema,
-            SourceReader.Context context) {
+            Context context,
+            MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
         this.metadata = metadata;
         this.context = context;
+        this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
         this.sourceSplits = new HashSet<>();
         this.deserializationSchema = deserializationSchema;
         this.consumerThreadMap = new ConcurrentHashMap<>();
@@ -145,8 +149,20 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
                                                     for (ConsumerRecord<byte[], byte[]> record :
                                                             recordList) {
 
-                                                        deserializationSchema.deserialize(
-                                                                record.value(), output);
+                                                        try {
+                                                            deserializationSchema.deserialize(
+                                                                    record.value(), output);
+                                                        } catch (IOException e) {
+                                                            if (this.messageFormatErrorHandleWay
+                                                                    == MessageFormatErrorHandleWay
+                                                                            .SKIP) {
+                                                                log.warn(
+                                                                        "Deserialize message failed, skip this message, message: {}",
+                                                                        record.value());
+                                                                continue;
+                                                            }
+                                                            throw e;
+                                                        }
 
                                                         if (Boundedness.BOUNDED.equals(
                                                                         context.getBoundedness())
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index f9fb72711..dee8cb8c4 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -205,6 +205,38 @@ public class KafkaIT extends TestSuiteBase implements TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
     }
 
+    @TestTemplate
+    public void testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        "test_topic_error_message",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        Container.ExecResult execResult =
+                container.executeJob(
+                        "/kafka/kafkasource_format_error_handle_way_skip_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        "test_topic_error_message",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        Container.ExecResult execResult =
+                container.executeJob(
+                        "/kafka/kafkasource_format_error_handle_way_fail_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
     @TestTemplate
     public void testSourceKafka(TestContainer container) throws IOException, InterruptedException {
         testKafkaLatestToConsole(container);
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
similarity index 94%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
index 129149623..19c1331f7 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
@@ -33,9 +33,11 @@ env {
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
-    topic = "test_topic_json"
+    topic = "test_topic_error_message"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
+    format_error_handle_way = fail
+#     kafka.auto.offset.reset = "earliest"
     schema = {
       fields {
            id = bigint
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
similarity index 94%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 129149623..012f93ed3 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -33,9 +33,11 @@ env {
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
-    topic = "test_topic_json"
+    topic = "test_topic_error_message"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
+    format_error_handle_way = skip
+#     kafka.auto.offset.reset = "earliest"
     schema = {
       fields {
            id = bigint
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
index 129149623..ace91e2d3 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
@@ -36,6 +36,7 @@ source {
     topic = "test_topic_json"
     result_table_name = "kafka_table"
     kafka.auto.offset.reset = "earliest"
+    format_error_handle_way = skip
     schema = {
       fields {
            id = bigint
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
index 67879791b..af6db138b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
@@ -36,6 +36,7 @@ source {
     topic = "test_topic_text"
     result_table_name = "kafka_table"
     kafka.auto.offset.reset = "earliest"
+    format_error_handle_way = fail
     schema = {
       fields {
            id = bigint