You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/31 11:06:21 UTC

[inlong] 02/02: [INLONG-5754][Sort] Fix Pulsar connecotr deserialize complex format with multiple data will cause data loss (#5755)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d20a0ae69309e6c1045cacb04a16a628c941350c
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Aug 31 19:04:56 2022 +0800

    [INLONG-5754][Sort] Fix Pulsar connecotr deserialize complex format with multiple data will cause data loss (#5755)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../sort/pulsar/withoutadmin/PulsarFetcher.java    |  6 ++--
 .../sort/pulsar/withoutadmin/ReaderThread.java     | 42 +++++++++++++++++++---
 .../inlongmsg/InLongMsgDeserializationSchema.java  |  2 ++
 3 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
index cb5933d56..bcaa66301 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
@@ -50,6 +50,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
@@ -398,14 +399,15 @@ public class PulsarFetcher<T> {
      * @param pulsarEventTimestamp The timestamp of the pulsar record
      */
     protected void emitRecordsWithTimestamps(
-            T record,
+            Queue<T> records,
             PulsarTopicState<T> partitionState,
             MessageId offset,
             long pulsarEventTimestamp) {
         // emit the records, using the checkpoint lock to guarantee
         // atomicity of record emission and offset state update
         synchronized (checkpointLock) {
-            if (record != null) {
+            T record;
+            while ((record = records.poll()) != null) {
                 long timestamp = partitionState.extractTimestamp(record, pulsarEventTimestamp);
                 sourceContext.collectWithTimestamp(record, timestamp);
 
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
index cff23fc00..af1fac9fc 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
 import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
 import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
+import org.apache.flink.util.Collector;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -36,7 +37,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -59,6 +62,7 @@ public class ReaderThread<T> extends Thread {
     protected boolean excludeMessageId = false;
     private boolean failOnDataLoss = true;
     private boolean useEarliestWhenDataLoss = false;
+    private final PulsarCollector pulsarCollector;
 
     protected volatile boolean running = true;
     protected volatile boolean closed = false;
@@ -85,6 +89,7 @@ public class ReaderThread<T> extends Thread {
 
         this.topicRange = state.getTopicRange();
         this.startMessageId = state.getOffset();
+        this.pulsarCollector = new PulsarCollector();
     }
 
     public ReaderThread(
@@ -151,12 +156,13 @@ public class ReaderThread<T> extends Thread {
 
     protected void emitRecord(Message<T> message) throws IOException {
         MessageId messageId = message.getMessageId();
-        final T record = deserializer.deserialize(message);
-        if (deserializer.isEndOfStream(record)) {
+        deserializer.deserialize(message, pulsarCollector);
+
+        owner.emitRecordsWithTimestamps(pulsarCollector.getRecords(), state, messageId, message.getEventTime());
+        if (pulsarCollector.isEndOfStreamSignalled()) {
+            // end of stream signaled
             running = false;
-            return;
         }
-        owner.emitRecordsWithTimestamps(record, state, messageId, message.getEventTime());
     }
 
     public void cancel() throws IOException {
@@ -219,4 +225,32 @@ public class ReaderThread<T> extends Thread {
                             r.getClass().toString()));
         }
     }
+
+    private class PulsarCollector implements Collector<T> {
+        private final Queue<T> records = new ArrayDeque<>();
+
+        private boolean endOfStreamSignalled = false;
+
+        @Override
+        public void collect(T record) {
+            // do not emit subsequent elements if the end of the stream reached
+            if (endOfStreamSignalled || deserializer.isEndOfStream(record)) {
+                endOfStreamSignalled = true;
+                return;
+            }
+            records.add(record);
+        }
+
+        public Queue<T> getRecords() {
+            return records;
+        }
+
+        public boolean isEndOfStreamSignalled() {
+            return endOfStreamSignalled;
+        }
+
+        @Override
+        public void close() {
+        }
+    }
 }
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
index ba41a7447..d14c982e9 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
+import com.esotericsoftware.minlog.Log;
 import com.google.common.base.Objects;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -82,6 +83,7 @@ public class InLongMsgDeserializationSchema implements DeserializationSchema<Row
                 head = InLongMsgUtils.parseHead(attr);
             } catch (Throwable t) {
                 if (ignoreErrors) {
+                    Log.warn("Ignore inlong msg attr({})parse error.", attr, t);
                     continue;
                 }
                 throw new IOException(