You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/03/01 16:24:23 UTC
[pulsar] 14/21: [Pulsar SQL] Fix PulsarRecordCursor deserialize issue. (#14379)
This is an automated email from the ASF dual-hosted git repository.
rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 86fa49620191fd265dc27055f6840a100253aea8
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Feb 25 16:07:42 2022 +0800
[Pulsar SQL] Fix PulsarRecordCursor deserialize issue. (#14379)
(cherry picked from commit a96a1584ba2e9d19c6919b7597a0f344a2af1a35)
---
.../pulsar/sql/presto/PulsarRecordCursor.java | 202 +++++++++++----------
1 file changed, 111 insertions(+), 91 deletions(-)
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index d839e05..56ae17b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -45,7 +45,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
@@ -110,6 +112,7 @@ public class PulsarRecordCursor implements RecordCursor {
private final long splitSize;
private long entriesProcessed = 0;
private int partition = -1;
+ private volatile Throwable deserializingError;
private PulsarSqlSchemaInfoProvider schemaInfoProvider;
@@ -236,113 +239,125 @@ public class PulsarRecordCursor implements RecordCursor {
}
@VisibleForTesting
- class DeserializeEntries implements Runnable {
+ class DeserializeEntries extends Thread {
- protected boolean isRunning = false;
+ private final AtomicBoolean isRunning;
- private final Thread thread;
+ private final CompletableFuture<Void> closeHandle;
public DeserializeEntries() {
- this.thread = new Thread(this, "derserialize-thread-split-" + pulsarSplit.getSplitId());
+ super("deserialize-thread-split-" + pulsarSplit.getSplitId());
+ this.isRunning = new AtomicBoolean(false);
+ this.closeHandle = new CompletableFuture<>();
}
- public void interrupt() {
- isRunning = false;
- thread.interrupt();
+ @Override
+ public void start() {
+ if (isRunning.compareAndSet(false, true)) {
+ super.start();
+ }
}
- public void start() {
- this.thread.start();
+ public CompletableFuture<Void> close() {
+ if (isRunning.compareAndSet(true, false)) {
+ super.interrupt();
+ }
+ return closeHandle;
}
@Override
public void run() {
- isRunning = true;
- while (isRunning) {
-
- int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() {
- @Override
- public void accept(Entry entry) {
-
- try {
- entryQueueCacheSizeAllocator.release(entry.getLength());
-
- long bytes = entry.getDataBuffer().readableBytes();
- completedBytes += bytes;
- // register stats for bytes read
- metricsTracker.register_BYTES_READ(bytes);
-
- // check if we have processed all entries in this split
- // and no incomplete chunked messages exist
- if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
- return;
- }
-
- // set start time for time deserializing entries for stats
- metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+ try {
+ while (isRunning.get()) {
+ int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() {
+ @Override
+ public void accept(Entry entry) {
try {
- MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
- entry.getDataBuffer(), (message) -> {
- try {
- // start time for message queue read
- metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
-
- if (message.getNumChunksFromMsg() > 1) {
- message = processChunkedMessages(message);
- } else if (entryExceedSplitEndPosition(entry)) {
- // skip no chunk or no multi chunk message
- // that exceed split end position
- message.release();
- message = null;
- }
- if (message != null) {
- while (true) {
- if (!haveAvailableCacheSize(
- messageQueueCacheSizeAllocator, messageQueue)
- || !messageQueue.offer(message)) {
- Thread.sleep(1);
- } else {
- messageQueueCacheSizeAllocator.allocate(
- message.getData().readableBytes());
- break;
+ entryQueueCacheSizeAllocator.release(entry.getLength());
+
+ long bytes = entry.getDataBuffer().readableBytes();
+ completedBytes += bytes;
+ // register stats for bytes read
+ metricsTracker.register_BYTES_READ(bytes);
+
+ // check if we have processed all entries in this split
+ // and no incomplete chunked messages exist
+ if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
+ return;
+ }
+
+ // set start time for time deserializing entries for stats
+ metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+
+ try {
+ MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
+ entry.getDataBuffer(), (message) -> {
+ try {
+ // start time for message queue read
+ metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+
+ if (message.getNumChunksFromMsg() > 1) {
+ message = processChunkedMessages(message);
+ } else if (entryExceedSplitEndPosition(entry)) {
+ // skip no chunk or no multi chunk message
+ // that exceed split end position
+ message.release();
+ message = null;
+ }
+ if (message != null) {
+ while (true) {
+ if (!haveAvailableCacheSize(
+ messageQueueCacheSizeAllocator, messageQueue)
+ || !messageQueue.offer(message)) {
+ Thread.sleep(1);
+ } else {
+ messageQueueCacheSizeAllocator.allocate(
+ message.getData().readableBytes());
+ break;
+ }
}
}
- }
- // stats for how long a read from message queue took
- metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
- // stats for number of messages read
- metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
-
- } catch (InterruptedException e) {
- //no-op
- }
- }, pulsarConnectorConfig.getMaxMessageSize());
- } catch (IOException e) {
- log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
- throw new RuntimeException(e);
- }
- // stats for time spend deserializing entries
- metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+ // stats for how long a read from message queue took
+ metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+ // stats for number of messages read
+ metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
- // stats for num messages per entry
- metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
-
- } finally {
- entriesProcessed++;
- entry.release();
+ } catch (InterruptedException e) {
+ //no-op
+ }
+ }, pulsarConnectorConfig.getMaxMessageSize());
+ } catch (IOException e) {
+ log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
+ throw new RuntimeException(e);
+ }
+ // stats for time spend deserializing entries
+ metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+
+ // stats for num messages per entry
+ metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+
+ } finally {
+ entriesProcessed++;
+ entry.release();
+ }
}
- }
- });
+ });
- if (read <= 0) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- return;
+ if (read <= 0) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ return;
+ }
}
}
+ closeHandle.complete(null);
+ } catch (Throwable ex) {
+ log.error(ex, "Stop running DeserializeEntries");
+ closeHandle.completeExceptionally(ex);
+ throw ex;
}
}
}
@@ -468,6 +483,9 @@ public class PulsarRecordCursor implements RecordCursor {
if (readEntries == null) {
// start deserialize thread
deserializeEntries = new DeserializeEntries();
+ deserializeEntries.setUncaughtExceptionHandler((t, ex) -> {
+ deserializingError = ex;
+ });
deserializeEntries.start();
readEntries = new ReadEntries();
@@ -492,6 +510,8 @@ public class PulsarRecordCursor implements RecordCursor {
if (currentMessage != null) {
messageQueueCacheSizeAllocator.release(currentMessage.getData().readableBytes());
break;
+ } else if (deserializingError != null) {
+ throw new RuntimeException(deserializingError);
} else {
try {
Thread.sleep(1);
@@ -503,7 +523,7 @@ public class PulsarRecordCursor implements RecordCursor {
}
}
- //start time for deseralizing record
+ //start time for deserializing record
metricsTracker.start_RECORD_DESERIALIZE_TIME();
SchemaInfo schemaInfo = getSchemaInfo(pulsarSplit);
@@ -714,12 +734,12 @@ public class PulsarRecordCursor implements RecordCursor {
messageQueue.drain(RawMessage::release);
}
- if (entryQueue != null) {
- entryQueue.drain(Entry::release);
- }
-
if (deserializeEntries != null) {
- deserializeEntries.interrupt();
+ deserializeEntries.close().whenComplete((r, t) -> {
+ if (entryQueue != null) {
+ entryQueue.drain(Entry::release);
+ }
+ });
}
if (this.cursor != null) {