You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/03/01 16:24:23 UTC

[pulsar] 14/21: [Pulsar SQL] Fix PulsarRecordCursor deserialize issue. (#14379)

This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 86fa49620191fd265dc27055f6840a100253aea8
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Feb 25 16:07:42 2022 +0800

    [Pulsar SQL] Fix PulsarRecordCursor deserialize issue. (#14379)
    
    (cherry picked from commit a96a1584ba2e9d19c6919b7597a0f344a2af1a35)
---
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 202 +++++++++++----------
 1 file changed, 111 insertions(+), 91 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index d839e05..56ae17b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -45,7 +45,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -110,6 +112,7 @@ public class PulsarRecordCursor implements RecordCursor {
     private final long splitSize;
     private long entriesProcessed = 0;
     private int partition = -1;
+    private volatile Throwable deserializingError;
 
 
     private PulsarSqlSchemaInfoProvider schemaInfoProvider;
@@ -236,113 +239,125 @@ public class PulsarRecordCursor implements RecordCursor {
     }
 
     @VisibleForTesting
-    class DeserializeEntries implements Runnable {
+    class DeserializeEntries extends Thread {
 
-        protected boolean isRunning = false;
+        private final AtomicBoolean isRunning;
 
-        private final Thread thread;
+        private final CompletableFuture<Void> closeHandle;
 
         public DeserializeEntries() {
-            this.thread = new Thread(this, "derserialize-thread-split-" + pulsarSplit.getSplitId());
+            super("deserialize-thread-split-" + pulsarSplit.getSplitId());
+            this.isRunning = new AtomicBoolean(false);
+            this.closeHandle = new CompletableFuture<>();
         }
 
-        public void interrupt() {
-            isRunning = false;
-            thread.interrupt();
+        @Override
+        public void start() {
+            if (isRunning.compareAndSet(false, true)) {
+                super.start();
+            }
         }
 
-        public void start() {
-            this.thread.start();
+        public CompletableFuture<Void> close() {
+            if (isRunning.compareAndSet(true, false)) {
+                super.interrupt();
+            }
+            return closeHandle;
         }
 
         @Override
         public void run() {
-            isRunning = true;
-            while (isRunning) {
-
-                 int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() {
-                    @Override
-                    public void accept(Entry entry) {
-
-                        try {
-                            entryQueueCacheSizeAllocator.release(entry.getLength());
-
-                            long bytes = entry.getDataBuffer().readableBytes();
-                            completedBytes += bytes;
-                            // register stats for bytes read
-                            metricsTracker.register_BYTES_READ(bytes);
-
-                            // check if we have processed all entries in this split
-                            // and no incomplete chunked messages exist
-                            if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
-                                return;
-                            }
-
-                            // set start time for time deserializing entries for stats
-                            metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+            try {
+                while (isRunning.get()) {
+                    int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() {
+                        @Override
+                        public void accept(Entry entry) {
 
                             try {
-                                MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
-                                        entry.getDataBuffer(), (message) -> {
-                                            try {
-                                                // start time for message queue read
-                                                metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
-
-                                                if (message.getNumChunksFromMsg() > 1)  {
-                                                    message = processChunkedMessages(message);
-                                                } else if (entryExceedSplitEndPosition(entry)) {
-                                                    // skip no chunk or no multi chunk message
-                                                    // that exceed split end position
-                                                    message.release();
-                                                    message = null;
-                                                }
-                                                if (message != null) {
-                                                    while (true) {
-                                                        if (!haveAvailableCacheSize(
-                                                                messageQueueCacheSizeAllocator, messageQueue)
-                                                                || !messageQueue.offer(message)) {
-                                                            Thread.sleep(1);
-                                                        } else {
-                                                            messageQueueCacheSizeAllocator.allocate(
-                                                                    message.getData().readableBytes());
-                                                            break;
+                                entryQueueCacheSizeAllocator.release(entry.getLength());
+
+                                long bytes = entry.getDataBuffer().readableBytes();
+                                completedBytes += bytes;
+                                // register stats for bytes read
+                                metricsTracker.register_BYTES_READ(bytes);
+
+                                // check if we have processed all entries in this split
+                                // and no incomplete chunked messages exist
+                                if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
+                                    return;
+                                }
+
+                                // set start time for time deserializing entries for stats
+                                metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+
+                                try {
+                                    MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
+                                            entry.getDataBuffer(), (message) -> {
+                                                try {
+                                                    // start time for message queue read
+                                                    metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+
+                                                    if (message.getNumChunksFromMsg() > 1)  {
+                                                        message = processChunkedMessages(message);
+                                                    } else if (entryExceedSplitEndPosition(entry)) {
+                                                        // skip no chunk or no multi chunk message
+                                                        // that exceed split end position
+                                                        message.release();
+                                                        message = null;
+                                                    }
+                                                    if (message != null) {
+                                                        while (true) {
+                                                            if (!haveAvailableCacheSize(
+                                                                    messageQueueCacheSizeAllocator, messageQueue)
+                                                                    || !messageQueue.offer(message)) {
+                                                                Thread.sleep(1);
+                                                            } else {
+                                                                messageQueueCacheSizeAllocator.allocate(
+                                                                        message.getData().readableBytes());
+                                                                break;
+                                                            }
                                                         }
                                                     }
-                                                }
 
-                                                // stats for how long a read from message queue took
-                                                metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
-                                                // stats for number of messages read
-                                                metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
-
-                                            } catch (InterruptedException e) {
-                                                //no-op
-                                            }
-                                        }, pulsarConnectorConfig.getMaxMessageSize());
-                            } catch (IOException e) {
-                                log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
-                                throw new RuntimeException(e);
-                            }
-                            // stats for time spend deserializing entries
-                            metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+                                                    // stats for how long a read from message queue took
+                                                    metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+                                                    // stats for number of messages read
+                                                    metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
 
-                            // stats for num messages per entry
-                            metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
-
-                        } finally {
-                            entriesProcessed++;
-                            entry.release();
+                                                } catch (InterruptedException e) {
+                                                    //no-op
+                                                }
+                                            }, pulsarConnectorConfig.getMaxMessageSize());
+                                } catch (IOException e) {
+                                    log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
+                                    throw new RuntimeException(e);
+                                }
+                                // stats for time spend deserializing entries
+                                metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+
+                                // stats for num messages per entry
+                                metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+
+                            } finally {
+                                entriesProcessed++;
+                                entry.release();
+                            }
                         }
-                    }
-                });
+                    });
 
-                if (read <= 0) {
-                    try {
-                        Thread.sleep(1);
-                    } catch (InterruptedException e) {
-                        return;
+                    if (read <= 0) {
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            return;
+                        }
                     }
                 }
+                closeHandle.complete(null);
+            } catch (Throwable ex) {
+                log.error(ex, "Stop running DeserializeEntries");
+                closeHandle.completeExceptionally(ex);
+                throw ex;
             }
         }
     }
@@ -468,6 +483,9 @@ public class PulsarRecordCursor implements RecordCursor {
         if (readEntries == null) {
             // start deserialize thread
             deserializeEntries = new DeserializeEntries();
+            deserializeEntries.setUncaughtExceptionHandler((t, ex) -> {
+                deserializingError = ex;
+            });
             deserializeEntries.start();
 
             readEntries = new ReadEntries();
@@ -492,6 +510,8 @@ public class PulsarRecordCursor implements RecordCursor {
             if (currentMessage != null) {
                 messageQueueCacheSizeAllocator.release(currentMessage.getData().readableBytes());
                 break;
+            } else if (deserializingError != null) {
+                throw new RuntimeException(deserializingError);
             } else {
                 try {
                     Thread.sleep(1);
@@ -503,7 +523,7 @@ public class PulsarRecordCursor implements RecordCursor {
             }
         }
 
-        //start time for deseralizing record
+        //start time for deserializing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
         SchemaInfo schemaInfo = getSchemaInfo(pulsarSplit);
@@ -714,12 +734,12 @@ public class PulsarRecordCursor implements RecordCursor {
             messageQueue.drain(RawMessage::release);
         }
 
-        if (entryQueue != null) {
-            entryQueue.drain(Entry::release);
-        }
-
         if (deserializeEntries != null) {
-            deserializeEntries.interrupt();
+            deserializeEntries.close().whenComplete((r, t) -> {
+                if (entryQueue != null) {
+                    entryQueue.drain(Entry::release);
+                }
+            });
         }
 
         if (this.cursor != null) {