You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/31 11:05:00 UTC
[inlong] branch master updated: [INLONG-5754][Sort] Fix Pulsar connecotr deserialize complex format with multiple data will cause data loss (#5755)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 28e82729e [INLONG-5754][Sort] Fix Pulsar connecotr deserialize complex format with multiple data will cause data loss (#5755)
28e82729e is described below
commit 28e82729e28e4a16fe58b5f9588f0d366d4116b2
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Aug 31 19:04:56 2022 +0800
[INLONG-5754][Sort] Fix Pulsar connecotr deserialize complex format with multiple data will cause data loss (#5755)
Co-authored-by: thesumery <15...@qq.com>
---
.../sort/pulsar/withoutadmin/PulsarFetcher.java | 6 ++--
.../sort/pulsar/withoutadmin/ReaderThread.java | 42 +++++++++++++++++++---
.../inlongmsg/InLongMsgDeserializationSchema.java | 2 ++
3 files changed, 44 insertions(+), 6 deletions(-)
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
index cb5933d56..bcaa66301 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
@@ -50,6 +50,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@@ -398,14 +399,15 @@ public class PulsarFetcher<T> {
* @param pulsarEventTimestamp The timestamp of the pulsar record
*/
protected void emitRecordsWithTimestamps(
- T record,
+ Queue<T> records,
PulsarTopicState<T> partitionState,
MessageId offset,
long pulsarEventTimestamp) {
// emit the records, using the checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
- if (record != null) {
+ T record;
+ while ((record = records.poll()) != null) {
long timestamp = partitionState.extractTimestamp(record, pulsarEventTimestamp);
sourceContext.collectWithTimestamp(record, timestamp);
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
index cff23fc00..af1fac9fc 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
+import org.apache.flink.util.Collector;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -36,7 +37,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.TimeUnit;
/**
@@ -59,6 +62,7 @@ public class ReaderThread<T> extends Thread {
protected boolean excludeMessageId = false;
private boolean failOnDataLoss = true;
private boolean useEarliestWhenDataLoss = false;
+ private final PulsarCollector pulsarCollector;
protected volatile boolean running = true;
protected volatile boolean closed = false;
@@ -85,6 +89,7 @@ public class ReaderThread<T> extends Thread {
this.topicRange = state.getTopicRange();
this.startMessageId = state.getOffset();
+ this.pulsarCollector = new PulsarCollector();
}
public ReaderThread(
@@ -151,12 +156,13 @@ public class ReaderThread<T> extends Thread {
protected void emitRecord(Message<T> message) throws IOException {
MessageId messageId = message.getMessageId();
- final T record = deserializer.deserialize(message);
- if (deserializer.isEndOfStream(record)) {
+ deserializer.deserialize(message, pulsarCollector);
+
+ owner.emitRecordsWithTimestamps(pulsarCollector.getRecords(), state, messageId, message.getEventTime());
+ if (pulsarCollector.isEndOfStreamSignalled()) {
+ // end of stream signaled
running = false;
- return;
}
- owner.emitRecordsWithTimestamps(record, state, messageId, message.getEventTime());
}
public void cancel() throws IOException {
@@ -219,4 +225,32 @@ public class ReaderThread<T> extends Thread {
r.getClass().toString()));
}
}
+
+ private class PulsarCollector implements Collector<T> {
+ private final Queue<T> records = new ArrayDeque<>();
+
+ private boolean endOfStreamSignalled = false;
+
+ @Override
+ public void collect(T record) {
+ // do not emit subsequent elements if the end of the stream reached
+ if (endOfStreamSignalled || deserializer.isEndOfStream(record)) {
+ endOfStreamSignalled = true;
+ return;
+ }
+ records.add(record);
+ }
+
+ public Queue<T> getRecords() {
+ return records;
+ }
+
+ public boolean isEndOfStreamSignalled() {
+ return endOfStreamSignalled;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
index ba41a7447..d14c982e9 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.formats.inlongmsg;
+import com.esotericsoftware.minlog.Log;
import com.google.common.base.Objects;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -82,6 +83,7 @@ public class InLongMsgDeserializationSchema implements DeserializationSchema<Row
head = InLongMsgUtils.parseHead(attr);
} catch (Throwable t) {
if (ignoreErrors) {
+ Log.warn("Ignore inlong msg attr({})parse error.", attr, t);
continue;
}
throw new IOException(