You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/13 03:22:27 UTC
[pulsar] branch master updated: [Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 93b74b5 [Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)
93b74b5 is described below
commit 93b74b5498ced9e75512c593e6e5a9f5a6c8f26b
Author: ran <ga...@126.com>
AuthorDate: Mon Dec 13 11:20:47 2021 +0800
[Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)
### Motivation
Currently, the Pulsar SQL didn't support query chunked messages.
### Modifications
Add a chunked message map in `PulsarRecordCursor` to maintain incomplete chunked messages, if one chunked message was received completely, it will be offered in the message queue to wait for deserialization.
---
.../apache/pulsar/client/impl/ProducerImpl.java | 9 +
.../apache/pulsar/common/api/raw/RawMessage.java | 29 +++
.../pulsar/common/api/raw/RawMessageImpl.java | 44 +++++
.../pulsar/sql/presto/PulsarRecordCursor.java | 141 ++++++++++++--
.../pulsar/sql/presto/TestReadChunkedMessages.java | 214 +++++++++++++++++++++
5 files changed, 424 insertions(+), 13 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 1652113..dbebfb9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -478,7 +478,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
sequenceId = msgMetadata.getSequenceId();
}
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
+ byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion() ?
+ msg.getMessageBuilder().getSchemaVersion() : null;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+ // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
+ // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
+ // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
+ // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
+ if (chunkId > 0 && schemaVersion != null) {
+ msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+ }
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
compressedPayload.readableBytes(), uncompressedSize, callback);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
index d093628..483b5a3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
@@ -121,4 +121,33 @@ public interface RawMessage {
* @return true if the key is base64 encoded, false otherwise
*/
boolean hasBase64EncodedKey();
+
+ /**
+ * Get uuid of chunked message.
+ *
+ * @return uuid
+ */
+ String getUUID();
+
+ /**
+ * Get chunkId of chunked message.
+ *
+ * @return chunkId
+ */
+ int getChunkId();
+
+ /**
+ * Get chunk num of chunked message.
+ *
+ * @return chunk num
+ */
+ int getNumChunksFromMsg();
+
+ /**
+ * Get chunk message total size in bytes.
+ *
+ * @return chunked message total size in bytes
+ */
+ int getTotalChunkMsgSize();
+
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index defc1b4..3aa0cbc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -81,6 +81,14 @@ public class RawMessageImpl implements RawMessage {
return msg;
}
+ public RawMessage updatePayloadForChunkedMessage(ByteBuf chunkedTotalPayload) {
+ if (!msgMetadata.getMetadata().hasNumChunksFromMsg() || msgMetadata.getMetadata().getNumChunksFromMsg() <= 1) {
+ throw new RuntimeException("The update payload operation only support multi chunked messages.");
+ }
+ payload = chunkedTotalPayload;
+ return this;
+ }
+
@Override
public Map<String, String> getProperties() {
if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
@@ -170,6 +178,42 @@ public class RawMessageImpl implements RawMessage {
return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
}
+ @Override
+ public String getUUID() {
+ if (msgMetadata.getMetadata().hasUuid()) {
+ return msgMetadata.getMetadata().getUuid();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int getChunkId() {
+ if (msgMetadata.getMetadata().hasChunkId()) {
+ return msgMetadata.getMetadata().getChunkId();
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int getNumChunksFromMsg() {
+ if (msgMetadata.getMetadata().hasNumChunksFromMsg()) {
+ return msgMetadata.getMetadata().getNumChunksFromMsg();
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int getTotalChunkMsgSize() {
+ if (msgMetadata.getMetadata().hasTotalChunkMsgSize()) {
+ return msgMetadata.getMetadata().getTotalChunkMsgSize();
+ } else {
+ return -1;
+ }
+ }
+
public int getBatchSize() {
return msgMetadata.getMetadata().getNumMessagesInBatch();
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index b1230d3..558b87b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -29,6 +29,9 @@ import com.google.common.annotations.VisibleForTesting;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
import io.prestosql.decoder.DecoderColumnHandle;
import io.prestosql.decoder.FieldValueProvider;
import io.prestosql.spi.block.Block;
@@ -58,6 +61,8 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.api.raw.RawMessageIdImpl;
+import org.apache.pulsar.common.api.raw.RawMessageImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -66,6 +71,7 @@ import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.sql.presto.util.CacheSizeAllocator;
import org.apache.pulsar.sql.presto.util.NoStrictCacheSizeAllocator;
import org.apache.pulsar.sql.presto.util.NullCacheSizeAllocator;
@@ -112,6 +118,8 @@ public class PulsarRecordCursor implements RecordCursor {
PulsarDispatchingRowDecoderFactory decoderFactory;
+ protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+
private static final Logger log = Logger.get(PulsarRecordCursor.class);
public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
@@ -265,7 +273,8 @@ public class PulsarRecordCursor implements RecordCursor {
metricsTracker.register_BYTES_READ(bytes);
// check if we have processed all entries in this split
- if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0) {
+ // and no incomplete chunked messages exist
+ if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
return;
}
@@ -279,15 +288,25 @@ public class PulsarRecordCursor implements RecordCursor {
// start time for message queue read
metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
- while (true) {
- if (!haveAvailableCacheSize(
- messageQueueCacheSizeAllocator, messageQueue)
- || !messageQueue.offer(message)) {
- Thread.sleep(1);
- } else {
- messageQueueCacheSizeAllocator.allocate(
- message.getData().readableBytes());
- break;
+ if (message.getNumChunksFromMsg() > 1) {
+ message = processChunkedMessages(message);
+ } else if (entryExceedSplitEndPosition(entry)) {
+ // skip no chunk or no multi chunk message
+ // that exceed split end position
+ message.release();
+ message = null;
+ }
+ if (message != null) {
+ while (true) {
+ if (!haveAvailableCacheSize(
+ messageQueueCacheSizeAllocator, messageQueue)
+ || !messageQueue.offer(message)) {
+ Thread.sleep(1);
+ } else {
+ messageQueueCacheSizeAllocator.allocate(
+ message.getData().readableBytes());
+ break;
+ }
}
}
@@ -328,6 +347,10 @@ public class PulsarRecordCursor implements RecordCursor {
}
}
+ private boolean entryExceedSplitEndPosition(Entry entry) {
+ return ((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0;
+ }
+
@VisibleForTesting
class ReadEntries implements AsyncCallbacks.ReadEntriesCallback {
@@ -341,8 +364,9 @@ public class PulsarRecordCursor implements RecordCursor {
public void run() {
if (outstandingReadsRequests.get() > 0) {
- if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition())
- .compareTo(pulsarSplit.getEndPosition()) >= 0) {
+ if (!cursor.hasMoreEntries() ||
+ (((PositionImpl) cursor.getReadPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0
+ && chunkedMessagesMap.isEmpty())) {
isDone = true;
} else {
@@ -408,7 +432,7 @@ public class PulsarRecordCursor implements RecordCursor {
public boolean hasFinished() {
return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >= 1
- && splitSize <= entriesProcessed;
+ && splitSize <= entriesProcessed && chunkedMessagesMap.isEmpty();
}
@Override
@@ -732,4 +756,95 @@ public class PulsarRecordCursor implements RecordCursor {
}
}
+ private RawMessage processChunkedMessages(RawMessage message) {
+ final String uuid = message.getUUID();
+ final int chunkId = message.getChunkId();
+ final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+ final int numChunks = message.getNumChunksFromMsg();
+
+ RawMessageIdImpl rawMessageId = (RawMessageIdImpl) message.getMessageId();
+ if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+ && !chunkedMessagesMap.containsKey(uuid)) {
+ // If the message is out of the split range, we only care about the incomplete chunked messages.
+ message.release();
+ return null;
+ }
+ if (chunkId == 0) {
+ ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+ chunkedMessagesMap.computeIfAbsent(uuid, (key) -> ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+ }
+
+ ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+ if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+ || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || chunkId >= numChunks) {
+ // Means we lost the first chunk, it will happen when the beginning chunk didn't belong to this split.
+ log.info("Received unexpected chunk. messageId: %s, last-chunk-id: %s chunkId: %s, totalChunks: %s",
+ message.getMessageId(),
+ (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), chunkId,
+ numChunks);
+ if (chunkedMsgCtx != null) {
+ if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+ ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+ }
+ chunkedMsgCtx.recycle();
+ }
+ chunkedMessagesMap.remove(uuid);
+ message.release();
+ return null;
+ }
+
+ // append the chunked payload and update lastChunkedMessage-id
+ chunkedMsgCtx.chunkedMsgBuffer.writeBytes(message.getData());
+ chunkedMsgCtx.lastChunkedMessageId = chunkId;
+
+ // if final chunk is not received yet then release payload and return
+ if (chunkId != (numChunks - 1)) {
+ message.release();
+ return null;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Chunked message completed. chunkId: %s, totalChunks: %s, msgId: %s, sequenceId: %s",
+ chunkId, numChunks, rawMessageId, message.getSequenceId());
+ }
+ chunkedMessagesMap.remove(uuid);
+ ByteBuf unCompressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+ chunkedMsgCtx.recycle();
+ // The chunked message complete, we use the entire payload to instead of the last chunk payload.
+ return ((RawMessageImpl) message).updatePayloadForChunkedMessage(unCompressedPayload);
+ }
+
+ static class ChunkedMessageCtx {
+
+ protected int totalChunks = -1;
+ protected ByteBuf chunkedMsgBuffer;
+ protected int lastChunkedMessageId = -1;
+
+ static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf chunkedMsgBuffer) {
+ ChunkedMessageCtx ctx = RECYCLER.get();
+ ctx.totalChunks = numChunksFromMsg;
+ ctx.chunkedMsgBuffer = chunkedMsgBuffer;
+ return ctx;
+ }
+
+ private final Recycler.Handle<ChunkedMessageCtx> recyclerHandle;
+
+ private ChunkedMessageCtx(Recycler.Handle<ChunkedMessageCtx> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() {
+ protected ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
+ return new ChunkedMessageCtx(handle);
+ }
+ };
+
+ public void recycle() {
+ this.totalChunks = -1;
+ this.chunkedMsgBuffer = null;
+ this.lastChunkedMessageId = -1;
+ recyclerHandle.recycle(this);
+ }
+ }
+
}
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
new file mode 100644
index 0000000..0a02dc3
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import com.google.common.collect.Sets;
+import io.prestosql.spi.connector.ConnectorContext;
+import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.testing.TestingConnectorContext;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test read chunked messages.
+ */
+@Test
+@Slf4j
+public class TestReadChunkedMessages extends MockedPulsarServiceBaseTest {
+
+ private final static int MAX_MESSAGE_SIZE = 1024 * 1024;
+
+ @EqualsAndHashCode
+ @Data
+ static class Movie {
+ private String name;
+ private Long publishTime;
+ private byte[] binaryData;
+ }
+
+ @EqualsAndHashCode
+ @Data
+ static class MovieMessage {
+ private Movie movie;
+ private String messageId;
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ conf.setMaxMessageSize(MAX_MESSAGE_SIZE);
+ conf.setManagedLedgerMaxEntriesPerLedger(5);
+ conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ internalSetup();
+
+ admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+
+ // so that clients can test short names
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+
+ @Test
+ public void queryTest() throws Exception {
+ String topic = "chunk-topic";
+ TopicName topicName = TopicName.get(topic);
+ int messageCnt = 20;
+ Set<MovieMessage> messageSet = prepareChunkedData(topic, messageCnt);
+ SchemaInfo schemaInfo = Schema.AVRO(Movie.class).getSchemaInfo();
+
+ PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+ connectorConfig.setWebServiceUrl(pulsar.getWebServiceAddress());
+ PulsarSplitManager pulsarSplitManager = new PulsarSplitManager(new PulsarConnectorId("1"), connectorConfig);
+ Collection<PulsarSplit> splits = pulsarSplitManager.getSplitsForTopic(
+ topicName.getPersistenceNamingEncoding(),
+ pulsar.getManagedLedgerFactory(),
+ new ManagedLedgerConfig(),
+ 3,
+ new PulsarTableHandle("1", topicName.getNamespace(), topic, topic),
+ schemaInfo,
+ topic,
+ TupleDomain.all(),
+ null);
+
+ List<PulsarColumnHandle> columnHandleList = TestPulsarConnector.getColumnColumnHandles(
+ topicName, schemaInfo, PulsarColumnHandle.HandleKeyValueType.NONE, true);
+ ConnectorContext prestoConnectorContext = new TestingConnectorContext();
+
+ for (PulsarSplit split : splits) {
+ queryAndCheck(columnHandleList, split, connectorConfig, prestoConnectorContext, messageSet);
+ }
+ Assert.assertTrue(messageSet.isEmpty());
+ }
+
+ private Set<MovieMessage> prepareChunkedData(String topic, int messageCnt) throws PulsarClientException, InterruptedException {
+ pulsarClient.newConsumer(Schema.AVRO(Movie.class))
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribe()
+ .close();
+ Producer<Movie> producer = pulsarClient.newProducer(Schema.AVRO(Movie.class))
+ .topic(topic)
+ .enableBatching(false)
+ .enableChunking(true)
+ .create();
+ Set<MovieMessage> messageSet = new LinkedHashSet<>();
+ CountDownLatch countDownLatch = new CountDownLatch(messageCnt);
+ for (int i = 0; i < messageCnt; i++) {
+ final double dataTimes = (i % 5) * 0.5;
+ byte[] movieBinaryData = RandomUtils.nextBytes((int) (MAX_MESSAGE_SIZE * dataTimes));
+ final int length = movieBinaryData.length;
+ final int index = i;
+
+ Movie movie = new Movie();
+ movie.setName("movie-" + i);
+ movie.setPublishTime(System.currentTimeMillis());
+ movie.setBinaryData(movieBinaryData);
+ producer.newMessage().value(movie).sendAsync()
+ .whenComplete((msgId, throwable) -> {
+ if (throwable != null) {
+ log.error("Failed to produce message.", throwable);
+ countDownLatch.countDown();
+ return;
+ }
+ MovieMessage movieMessage = new MovieMessage();
+ movieMessage.setMovie(movie);
+ MessageIdImpl messageId = (MessageIdImpl) msgId;
+ movieMessage.setMessageId("(" + messageId.getLedgerId() + "," + messageId.getEntryId() + ",0)");
+ messageSet.add(movieMessage);
+ countDownLatch.countDown();
+ });
+ }
+ countDownLatch.await();
+ Assert.assertEquals(messageCnt, messageSet.size());
+ producer.close();
+ return messageSet;
+ }
+
+ private void queryAndCheck(List<PulsarColumnHandle> columnHandleList,
+ PulsarSplit split,
+ PulsarConnectorConfig connectorConfig,
+ ConnectorContext prestoConnectorContext,
+ Set<MovieMessage> messageSet) {
+ PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor(
+ columnHandleList, split, connectorConfig, pulsar.getManagedLedgerFactory(),
+ new ManagedLedgerConfig(), new PulsarConnectorMetricsTracker(new NullStatsProvider()),
+ new PulsarDispatchingRowDecoderFactory(prestoConnectorContext.getTypeManager()));
+
+ AtomicInteger receiveMsgCnt = new AtomicInteger(messageSet.size());
+ while (pulsarRecordCursor.advanceNextPosition()) {
+ Movie movie = new Movie();
+ MovieMessage movieMessage = new MovieMessage();
+ movieMessage.setMovie(movie);
+ for (int i = 0; i < columnHandleList.size(); i++) {
+ switch (columnHandleList.get(i).getName()) {
+ case "binaryData":
+ movie.setBinaryData(pulsarRecordCursor.getSlice(i).getBytes());
+ break;
+ case "name":
+ movie.setName(new String(pulsarRecordCursor.getSlice(i).getBytes()));
+ break;
+ case "publishTime":
+ movie.setPublishTime(pulsarRecordCursor.getLong(i));
+ break;
+ case "__message_id__":
+ movieMessage.setMessageId(new String(pulsarRecordCursor.getSlice(i).getBytes()));
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ Assert.assertTrue(messageSet.contains(movieMessage));
+ messageSet.remove(movieMessage);
+ receiveMsgCnt.decrementAndGet();
+ }
+ }
+
+}