You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/13 03:22:27 UTC

[pulsar] branch master updated: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 93b74b5  [Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)
93b74b5 is described below

commit 93b74b5498ced9e75512c593e6e5a9f5a6c8f26b
Author: ran <ga...@126.com>
AuthorDate: Mon Dec 13 11:20:47 2021 +0800

    [Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)
    
    ### Motivation
    
    Currently, the Pulsar SQL didn't support query chunked messages.
    
    ### Modifications
    
    Add a chunked message map in `PulsarRecordCursor` to maintain incomplete chunked messages, if one chunked message was received completely, it will be offered in the message queue to wait for deserialization.
---
 .../apache/pulsar/client/impl/ProducerImpl.java    |   9 +
 .../apache/pulsar/common/api/raw/RawMessage.java   |  29 +++
 .../pulsar/common/api/raw/RawMessageImpl.java      |  44 +++++
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 141 ++++++++++++--
 .../pulsar/sql/presto/TestReadChunkedMessages.java | 214 +++++++++++++++++++++
 5 files changed, 424 insertions(+), 13 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 1652113..dbebfb9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -478,7 +478,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     sequenceId = msgMetadata.getSequenceId();
                 }
                 String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+                byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion() ?
+                        msg.getMessageBuilder().getSchemaVersion() : null;
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+                    // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+                    // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+                    // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+                    // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+                    if (chunkId > 0 && schemaVersion != null) {
+                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    }
                     serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
                             readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
                             compressedPayload.readableBytes(), uncompressedSize, callback);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
index d093628..483b5a3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
@@ -121,4 +121,33 @@ public interface RawMessage {
      * @return true if the key is base64 encoded, false otherwise
      */
     boolean hasBase64EncodedKey();
+
+    /**
+     * Get uuid of chunked message.
+     *
+     * @return uuid
+     */
+    String getUUID();
+
+    /**
+     * Get chunkId of chunked message.
+     *
+     * @return chunkId
+     */
+    int getChunkId();
+
+    /**
+     * Get chunk num of chunked message.
+     *
+     * @return chunk num
+     */
+    int getNumChunksFromMsg();
+
+    /**
+     * Get chunk message total size in bytes.
+     *
+     * @return chunked message total size in bytes
+     */
+    int getTotalChunkMsgSize();
+
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index defc1b4..3aa0cbc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -81,6 +81,14 @@ public class RawMessageImpl implements RawMessage {
         return msg;
     }
 
+    public RawMessage updatePayloadForChunkedMessage(ByteBuf chunkedTotalPayload) {
+        if (!msgMetadata.getMetadata().hasNumChunksFromMsg() || msgMetadata.getMetadata().getNumChunksFromMsg() <= 1) {
+            throw new RuntimeException("The update payload operation only support multi chunked messages.");
+        }
+        payload = chunkedTotalPayload;
+        return this;
+    }
+
     @Override
     public Map<String, String> getProperties() {
         if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
@@ -170,6 +178,42 @@ public class RawMessageImpl implements RawMessage {
         return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
     }
 
+    @Override
+    public String getUUID() {
+        if (msgMetadata.getMetadata().hasUuid()) {
+            return msgMetadata.getMetadata().getUuid();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public int getChunkId() {
+        if (msgMetadata.getMetadata().hasChunkId()) {
+            return msgMetadata.getMetadata().getChunkId();
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    public int getNumChunksFromMsg() {
+        if (msgMetadata.getMetadata().hasNumChunksFromMsg()) {
+            return msgMetadata.getMetadata().getNumChunksFromMsg();
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    public int getTotalChunkMsgSize() {
+        if (msgMetadata.getMetadata().hasTotalChunkMsgSize()) {
+            return msgMetadata.getMetadata().getTotalChunkMsgSize();
+        } else {
+            return -1;
+        }
+    }
+
     public int getBatchSize() {
         return msgMetadata.getMetadata().getNumMessagesInBatch();
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index b1230d3..558b87b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -29,6 +29,9 @@ import com.google.common.annotations.VisibleForTesting;
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
 import io.prestosql.decoder.DecoderColumnHandle;
 import io.prestosql.decoder.FieldValueProvider;
 import io.prestosql.spi.block.Block;
@@ -58,6 +61,8 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.api.raw.MessageParser;
 import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.api.raw.RawMessageIdImpl;
+import org.apache.pulsar.common.api.raw.RawMessageImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -66,6 +71,7 @@ import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.sql.presto.util.CacheSizeAllocator;
 import org.apache.pulsar.sql.presto.util.NoStrictCacheSizeAllocator;
 import org.apache.pulsar.sql.presto.util.NullCacheSizeAllocator;
@@ -112,6 +118,8 @@ public class PulsarRecordCursor implements RecordCursor {
 
     PulsarDispatchingRowDecoderFactory decoderFactory;
 
+    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+
     private static final Logger log = Logger.get(PulsarRecordCursor.class);
 
     public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
@@ -265,7 +273,8 @@ public class PulsarRecordCursor implements RecordCursor {
                             metricsTracker.register_BYTES_READ(bytes);
 
                             // check if we have processed all entries in this split
-                            if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0) {
+                            // and no incomplete chunked messages exist
+                            if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
                                 return;
                             }
 
@@ -279,15 +288,25 @@ public class PulsarRecordCursor implements RecordCursor {
                                                 // start time for message queue read
                                                 metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
 
-                                                while (true) {
-                                                    if (!haveAvailableCacheSize(
-                                                            messageQueueCacheSizeAllocator, messageQueue)
-                                                            || !messageQueue.offer(message)) {
-                                                        Thread.sleep(1);
-                                                    } else {
-                                                        messageQueueCacheSizeAllocator.allocate(
-                                                                message.getData().readableBytes());
-                                                        break;
+                                                if (message.getNumChunksFromMsg() > 1)  {
+                                                    message = processChunkedMessages(message);
+                                                } else if (entryExceedSplitEndPosition(entry)) {
+                                                    // skip no chunk or no multi chunk message
+                                                    // that exceed split end position
+                                                    message.release();
+                                                    message = null;
+                                                }
+                                                if (message != null) {
+                                                    while (true) {
+                                                        if (!haveAvailableCacheSize(
+                                                                messageQueueCacheSizeAllocator, messageQueue)
+                                                                || !messageQueue.offer(message)) {
+                                                            Thread.sleep(1);
+                                                        } else {
+                                                            messageQueueCacheSizeAllocator.allocate(
+                                                                    message.getData().readableBytes());
+                                                            break;
+                                                        }
                                                     }
                                                 }
 
@@ -328,6 +347,10 @@ public class PulsarRecordCursor implements RecordCursor {
         }
     }
 
+    private boolean entryExceedSplitEndPosition(Entry entry) {
+        return ((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0;
+    }
+
     @VisibleForTesting
     class ReadEntries implements AsyncCallbacks.ReadEntriesCallback {
 
@@ -341,8 +364,9 @@ public class PulsarRecordCursor implements RecordCursor {
         public void run() {
 
             if (outstandingReadsRequests.get() > 0) {
-                if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition())
-                        .compareTo(pulsarSplit.getEndPosition()) >= 0) {
+                if (!cursor.hasMoreEntries() ||
+                        (((PositionImpl) cursor.getReadPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0
+                                && chunkedMessagesMap.isEmpty())) {
                     isDone = true;
 
                 } else {
@@ -408,7 +432,7 @@ public class PulsarRecordCursor implements RecordCursor {
 
         public boolean hasFinished() {
             return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >= 1
-                && splitSize <= entriesProcessed;
+                && splitSize <= entriesProcessed && chunkedMessagesMap.isEmpty();
         }
 
         @Override
@@ -732,4 +756,95 @@ public class PulsarRecordCursor implements RecordCursor {
         }
     }
 
+    private RawMessage processChunkedMessages(RawMessage message) {
+        final String uuid = message.getUUID();
+        final int chunkId = message.getChunkId();
+        final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+        final int numChunks = message.getNumChunksFromMsg();
+
+        RawMessageIdImpl rawMessageId = (RawMessageIdImpl) message.getMessageId();
+        if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+                && !chunkedMessagesMap.containsKey(uuid)) {
+            // If the message is out of the split range, we only care about the incomplete chunked messages.
+            message.release();
+            return null;
+        }
+        if (chunkId == 0) {
+            ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+            chunkedMessagesMap.computeIfAbsent(uuid, (key) -> ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+        }
+
+        ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+        if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+                || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || chunkId >= numChunks) {
+            // Means we lost the first chunk, it will happen when the beginning chunk didn't belong to this split.
+            log.info("Received unexpected chunk. messageId: %s, last-chunk-id: %s chunkId: %s, totalChunks: %s",
+                    message.getMessageId(),
+                    (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), chunkId,
+                    numChunks);
+            if (chunkedMsgCtx != null) {
+                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+                    ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+                }
+                chunkedMsgCtx.recycle();
+            }
+            chunkedMessagesMap.remove(uuid);
+            message.release();
+            return null;
+        }
+
+        // append the chunked payload and update lastChunkedMessage-id
+        chunkedMsgCtx.chunkedMsgBuffer.writeBytes(message.getData());
+        chunkedMsgCtx.lastChunkedMessageId = chunkId;
+
+        // if final chunk is not received yet then release payload and return
+        if (chunkId != (numChunks - 1)) {
+            message.release();
+            return null;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Chunked message completed. chunkId: %s, totalChunks: %s, msgId: %s, sequenceId: %s",
+                    chunkId, numChunks, rawMessageId, message.getSequenceId());
+        }
+        chunkedMessagesMap.remove(uuid);
+        ByteBuf unCompressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+        chunkedMsgCtx.recycle();
+        // The chunked message complete, we use the entire payload to instead of the last chunk payload.
+        return ((RawMessageImpl) message).updatePayloadForChunkedMessage(unCompressedPayload);
+    }
+
+    static class ChunkedMessageCtx {
+
+        protected int totalChunks = -1;
+        protected ByteBuf chunkedMsgBuffer;
+        protected int lastChunkedMessageId = -1;
+
+        static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf chunkedMsgBuffer) {
+            ChunkedMessageCtx ctx = RECYCLER.get();
+            ctx.totalChunks = numChunksFromMsg;
+            ctx.chunkedMsgBuffer = chunkedMsgBuffer;
+            return ctx;
+        }
+
+        private final Recycler.Handle<ChunkedMessageCtx> recyclerHandle;
+
+        private ChunkedMessageCtx(Recycler.Handle<ChunkedMessageCtx> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() {
+            protected ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
+                return new ChunkedMessageCtx(handle);
+            }
+        };
+
+        public void recycle() {
+            this.totalChunks = -1;
+            this.chunkedMsgBuffer = null;
+            this.lastChunkedMessageId = -1;
+            recyclerHandle.recycle(this);
+        }
+    }
+
 }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
new file mode 100644
index 0000000..0a02dc3
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import com.google.common.collect.Sets;
+import io.prestosql.spi.connector.ConnectorContext;
+import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.testing.TestingConnectorContext;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test read chunked messages.
+ */
+@Test
+@Slf4j
+public class TestReadChunkedMessages extends MockedPulsarServiceBaseTest {
+
+    private final static int MAX_MESSAGE_SIZE = 1024 * 1024;
+
+    @EqualsAndHashCode
+    @Data
+    static class Movie {
+        private String name;
+        private Long publishTime;
+        private byte[] binaryData;
+    }
+
+    @EqualsAndHashCode
+    @Data
+    static class MovieMessage {
+        private Movie movie;
+        private String messageId;
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setMaxMessageSize(MAX_MESSAGE_SIZE);
+        conf.setManagedLedgerMaxEntriesPerLedger(5);
+        conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+        internalSetup();
+
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+
+        // so that clients can test short names
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("public/default");
+        admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test
+    public void queryTest() throws Exception {
+        String topic = "chunk-topic";
+        TopicName topicName = TopicName.get(topic);
+        int messageCnt = 20;
+        Set<MovieMessage> messageSet = prepareChunkedData(topic, messageCnt);
+        SchemaInfo schemaInfo = Schema.AVRO(Movie.class).getSchemaInfo();
+
+        PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+        connectorConfig.setWebServiceUrl(pulsar.getWebServiceAddress());
+        PulsarSplitManager pulsarSplitManager = new PulsarSplitManager(new PulsarConnectorId("1"), connectorConfig);
+        Collection<PulsarSplit> splits = pulsarSplitManager.getSplitsForTopic(
+                topicName.getPersistenceNamingEncoding(),
+                pulsar.getManagedLedgerFactory(),
+                new ManagedLedgerConfig(),
+                3,
+                new PulsarTableHandle("1", topicName.getNamespace(), topic, topic),
+                schemaInfo,
+                topic,
+                TupleDomain.all(),
+                null);
+
+        List<PulsarColumnHandle> columnHandleList = TestPulsarConnector.getColumnColumnHandles(
+                topicName, schemaInfo, PulsarColumnHandle.HandleKeyValueType.NONE, true);
+        ConnectorContext prestoConnectorContext = new TestingConnectorContext();
+
+        for (PulsarSplit split : splits) {
+            queryAndCheck(columnHandleList, split, connectorConfig, prestoConnectorContext, messageSet);
+        }
+        Assert.assertTrue(messageSet.isEmpty());
+    }
+
+    private Set<MovieMessage> prepareChunkedData(String topic, int messageCnt) throws PulsarClientException, InterruptedException {
+        pulsarClient.newConsumer(Schema.AVRO(Movie.class))
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe()
+                .close();
+        Producer<Movie> producer = pulsarClient.newProducer(Schema.AVRO(Movie.class))
+                .topic(topic)
+                .enableBatching(false)
+                .enableChunking(true)
+                .create();
+        Set<MovieMessage> messageSet = new LinkedHashSet<>();
+        CountDownLatch countDownLatch = new CountDownLatch(messageCnt);
+        for (int i = 0; i < messageCnt; i++) {
+            final double dataTimes = (i % 5) * 0.5;
+            byte[] movieBinaryData = RandomUtils.nextBytes((int) (MAX_MESSAGE_SIZE * dataTimes));
+            final int length = movieBinaryData.length;
+            final int index = i;
+
+            Movie movie = new Movie();
+            movie.setName("movie-" + i);
+            movie.setPublishTime(System.currentTimeMillis());
+            movie.setBinaryData(movieBinaryData);
+            producer.newMessage().value(movie).sendAsync()
+                    .whenComplete((msgId, throwable) -> {
+                        if (throwable != null) {
+                            log.error("Failed to produce message.", throwable);
+                            countDownLatch.countDown();
+                            return;
+                        }
+                        MovieMessage movieMessage = new MovieMessage();
+                        movieMessage.setMovie(movie);
+                        MessageIdImpl messageId = (MessageIdImpl) msgId;
+                        movieMessage.setMessageId("(" + messageId.getLedgerId() + "," + messageId.getEntryId() + ",0)");
+                        messageSet.add(movieMessage);
+                        countDownLatch.countDown();
+                    });
+        }
+        countDownLatch.await();
+        Assert.assertEquals(messageCnt, messageSet.size());
+        producer.close();
+        return messageSet;
+    }
+
+    private void queryAndCheck(List<PulsarColumnHandle> columnHandleList,
+                               PulsarSplit split,
+                               PulsarConnectorConfig connectorConfig,
+                               ConnectorContext prestoConnectorContext,
+                               Set<MovieMessage> messageSet) {
+        PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor(
+                columnHandleList, split, connectorConfig, pulsar.getManagedLedgerFactory(),
+                new ManagedLedgerConfig(), new PulsarConnectorMetricsTracker(new NullStatsProvider()),
+                new PulsarDispatchingRowDecoderFactory(prestoConnectorContext.getTypeManager()));
+
+        AtomicInteger receiveMsgCnt = new AtomicInteger(messageSet.size());
+        while (pulsarRecordCursor.advanceNextPosition()) {
+            Movie movie = new Movie();
+            MovieMessage movieMessage = new MovieMessage();
+            movieMessage.setMovie(movie);
+            for (int i = 0; i < columnHandleList.size(); i++) {
+                switch (columnHandleList.get(i).getName()) {
+                    case "binaryData":
+                        movie.setBinaryData(pulsarRecordCursor.getSlice(i).getBytes());
+                        break;
+                    case "name":
+                        movie.setName(new String(pulsarRecordCursor.getSlice(i).getBytes()));
+                        break;
+                    case "publishTime":
+                        movie.setPublishTime(pulsarRecordCursor.getLong(i));
+                        break;
+                    case "__message_id__":
+                        movieMessage.setMessageId(new String(pulsarRecordCursor.getSlice(i).getBytes()));
+                    default:
+                        // do nothing
+                        break;
+                }
+            }
+
+            Assert.assertTrue(messageSet.contains(movieMessage));
+            messageSet.remove(movieMessage);
+            receiveMsgCnt.decrementAndGet();
+        }
+    }
+
+}