You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/04/28 05:38:50 UTC
[incubator-seatunnel] branch dev updated: Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 228257b2e Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687)
228257b2e is described below
commit 228257b2e256a83e18d3d5a9f9c34720223d65f9
Author: lightzhao <40...@users.noreply.github.com>
AuthorDate: Fri Apr 28 13:38:41 2023 +0800
Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687)
---
.../seatunnel/api/serialization/DeserializationSchema.java | 10 +++-------
.../connectors/seatunnel/kafka/source/KafkaSourceReader.java | 4 ++--
2 files changed, 5 insertions(+), 9 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
index 745e517a2..d7f7abb1a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
@@ -35,13 +35,9 @@ public interface DeserializationSchema<T> extends Serializable {
T deserialize(byte[] message) throws IOException;
default void deserialize(byte[] message, Collector<T> out) throws IOException {
- try {
- T deserialize = deserialize(message);
- if (deserialize != null) {
- out.collect(deserialize);
- }
- } catch (IOException e) {
- throw new IOException(e);
+ T deserialize = deserialize(message);
+ if (deserialize != null) {
+ out.collect(deserialize);
}
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 07fe71a60..226fded24 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -152,13 +152,13 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
try {
deserializationSchema.deserialize(
record.value(), output);
- } catch (IOException e) {
+ } catch (Exception e) {
if (this.messageFormatErrorHandleWay
== MessageFormatErrorHandleWay
.SKIP) {
log.warn(
"Deserialize message failed, skip this message, message: {}",
- record.value());
+ new String(record.value()));
continue;
}
throw e;