You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/04/28 05:38:50 UTC

[incubator-seatunnel] branch dev updated: Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 228257b2e Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687)
228257b2e is described below

commit 228257b2e256a83e18d3d5a9f9c34720223d65f9
Author: lightzhao <40...@users.noreply.github.com>
AuthorDate: Fri Apr 28 13:38:41 2023 +0800

    Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687)
---
 .../seatunnel/api/serialization/DeserializationSchema.java     | 10 +++-------
 .../connectors/seatunnel/kafka/source/KafkaSourceReader.java   |  4 ++--
 2 files changed, 5 insertions(+), 9 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
index 745e517a2..d7f7abb1a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
@@ -35,13 +35,9 @@ public interface DeserializationSchema<T> extends Serializable {
     T deserialize(byte[] message) throws IOException;
 
     default void deserialize(byte[] message, Collector<T> out) throws IOException {
-        try {
-            T deserialize = deserialize(message);
-            if (deserialize != null) {
-                out.collect(deserialize);
-            }
-        } catch (IOException e) {
-            throw new IOException(e);
+        T deserialize = deserialize(message);
+        if (deserialize != null) {
+            out.collect(deserialize);
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 07fe71a60..226fded24 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -152,13 +152,13 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
                                                         try {
                                                             deserializationSchema.deserialize(
                                                                     record.value(), output);
-                                                        } catch (IOException e) {
+                                                        } catch (Exception e) {
                                                             if (this.messageFormatErrorHandleWay
                                                                     == MessageFormatErrorHandleWay
                                                                             .SKIP) {
                                                                 log.warn(
                                                                         "Deserialize message failed, skip this message, message: {}",
-                                                                        record.value());
+                                                                        new String(record.value()));
                                                                 continue;
                                                             }
                                                             throw e;