You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/10 19:09:17 UTC

[pulsar] branch master updated: Use raw message when manually parsing messages from topic storage (#3146)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bddfa2a  Use raw message when manually parsing messages from topic storage (#3146)
bddfa2a is described below

commit bddfa2a5a233f17b93cb756b289b40d587321e4c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Dec 10 11:09:12 2018 -0800

    Use raw message when manually parsing messages from topic storage (#3146)
    
    * Use raw message when manually parsing messages from topic storage
    
    * Added missing headers
    
    * Fixed copy to byte[]
---
 .../pulsar/client/impl/MessageParserTest.java      |  30 ++---
 .../pulsar/common/api/raw}/MessageParser.java      |  71 ++++-------
 .../apache/pulsar/common/api/raw/RawMessage.java   | 104 +++++++++++++++
 .../apache/pulsar/common/api/raw/RawMessageId.java |   8 +-
 .../pulsar/common/api/raw/RawMessageIdImpl.java    |  22 +++-
 .../pulsar/common/api/raw/RawMessageImpl.java      | 139 +++++++++++++++++++++
 .../pulsar/sql/presto/AvroSchemaHandler.java       |  19 ++-
 .../pulsar/sql/presto/JSONSchemaHandler.java       |  33 ++++-
 .../pulsar/sql/presto/PulsarInternalColumn.java    |  20 +--
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  70 ++++++-----
 .../org/apache/pulsar/sql/presto/PulsarSplit.java  |   9 +-
 .../apache/pulsar/sql/presto/SchemaHandler.java    |   4 +-
 12 files changed, 404 insertions(+), 125 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
index a16166a..5ec6332 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -22,7 +22,7 @@ import static org.testng.Assert.assertEquals;
 
 import com.google.common.collect.Sets;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -32,12 +32,10 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.raw.MessageParser;
+import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -66,12 +64,6 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
-    public static String extractKey(RawMessage m) throws Exception {
-        ByteBuf headersAndPayload = m.getHeadersAndPayload();
-        MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
-        return msgMetadata.getPartitionKey();
-    }
-
     @Test
     public void testWithoutBatches() throws Exception {
         String topic = "persistent://my-tenant/my-ns/my-topic";
@@ -93,11 +85,11 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
         for (int i = 0; i < n; i++) {
             Entry entry = cursor.readEntriesOrWait(1).get(0);
 
-            List<Message<?>> messages = Lists.newArrayList();
+            List<RawMessage> messages = Lists.newArrayList();
 
             try {
                 MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer(),
-                        (messageId, message, payload) -> {
+                        (message) -> {
                             messages.add(message);
                         });
             } finally {
@@ -106,7 +98,9 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
 
             assertEquals(messages.size(), 1);
 
-            assertEquals(messages.get(0).getData(), ("hello-" + i).getBytes());
+            assertEquals(messages.get(0).getData(), Unpooled.wrappedBuffer(("hello-" + i).getBytes()));
+
+            messages.forEach(RawMessage::release);
         }
     }
 
@@ -133,11 +127,11 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
         assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
         Entry entry = cursor.readEntriesOrWait(1).get(0);
 
-        List<Message<?>> messages = Lists.newArrayList();
+        List<RawMessage> messages = Lists.newArrayList();
 
         try {
             MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer(),
-                    (messageId, message, payload) -> {
+                    (message) -> {
                         messages.add(message);
                     });
         } finally {
@@ -147,9 +141,11 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
         assertEquals(messages.size(), 10);
 
         for (int i = 0; i < n; i++) {
-            assertEquals(messages.get(i).getData(), ("hello-" + i).getBytes());
+            assertEquals(messages.get(i).getData(), Unpooled.wrappedBuffer(("hello-" + i).getBytes()));
         }
 
+        messages.forEach(RawMessage::release);
+
         producer.close();
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
similarity index 61%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index d68ce75..1280837 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.api.raw;
 
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
@@ -25,17 +25,13 @@ import static org.apache.pulsar.common.api.Commands.readChecksum;
 import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
-import java.util.Optional;
 
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
@@ -45,7 +41,7 @@ import org.apache.pulsar.common.naming.TopicName;
 @Slf4j
 public class MessageParser {
     public interface MessageProcessor {
-        void process(MessageId messageId, Message<?> message, ByteBuf payload);
+        void process(RawMessage message);
     }
 
     /**
@@ -54,19 +50,12 @@ public class MessageParser {
      */
     public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload,
             MessageProcessor processor) throws IOException {
-        MessageIdImpl msgId = new MessageIdImpl(ledgerId, entryId, -1);
-
-        MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
-        messageIdBuilder.setLedgerId(ledgerId);
-        messageIdBuilder.setEntryId(entryId);
-        MessageIdData messageId = messageIdBuilder.build();
-
         MessageMetadata msgMetadata = null;
         ByteBuf payload = headersAndPayload;
         ByteBuf uncompressedPayload = null;
 
         try {
-            if (!verifyChecksum(headersAndPayload, messageId, topicName.toString(), "reader")) {
+            if (!verifyChecksum(topicName, headersAndPayload, ledgerId, entryId)) {
                 // discard message with checksum error
                 return;
             }
@@ -74,7 +63,7 @@ public class MessageParser {
             try {
                 msgMetadata = Commands.parseMessageMetadata(payload);
             } catch (Throwable t) {
-                log.warn("[{}] Failed to deserialize metadata for message {} - Ignoring", topicName, messageId);
+                log.warn("[{}] Failed to deserialize metadata for message {}:{} - Ignoring", topicName, ledgerId, entryId);
                 return;
             }
 
@@ -82,8 +71,8 @@ public class MessageParser {
                 throw new IOException("Cannot parse encrypted message " + msgMetadata + " on topic " + topicName);
             }
 
-            uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, headersAndPayload,
-                    topicName.toString(), "reader");
+            uncompressedPayload = uncompressPayloadIfNeeded(topicName, msgMetadata, headersAndPayload, ledgerId,
+                    entryId);
 
             if (uncompressedPayload == null) {
                 // Message was discarded on decompression error
@@ -93,35 +82,29 @@ public class MessageParser {
             final int numMessages = msgMetadata.getNumMessagesInBatch();
 
             if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
-                final MessageImpl<?> message = new MessageImpl<>(topicName.toString(),
-                                                                 msgId, msgMetadata, uncompressedPayload,
-                                                                 null, null);
-                processor.process(msgId, message, uncompressedPayload);
+
+                processor.process(RawMessageImpl.get(msgMetadata, null, uncompressedPayload, ledgerId, entryId, 0));
             } else {
                 // handle batch message enqueuing; uncompressed payload has all messages in batch
-                receiveIndividualMessagesFromBatch(topicName.toString(), msgMetadata, uncompressedPayload, messageId, null, -1, processor);
+                receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, ledgerId, entryId, processor);
             }
         } finally {
             if (uncompressedPayload != null) {
                 uncompressedPayload.release();
             }
 
-            messageIdBuilder.recycle();
-            messageId.recycle();
             msgMetadata.recycle();
         }
     }
 
-    public static boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId, String topic,
-            String subscription) {
+    public static boolean verifyChecksum(TopicName topic, ByteBuf headersAndPayload, long ledgerId, long entryId) {
         if (hasChecksum(headersAndPayload)) {
             int checksum = readChecksum(headersAndPayload);
             int computedChecksum = computeChecksum(headersAndPayload);
             if (checksum != computedChecksum) {
                 log.error(
-                        "[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}",
-                        topic, subscription, messageId.getLedgerId(), messageId.getEntryId(),
-                        Long.toHexString(checksum), Integer.toHexString(computedChecksum));
+                        "[{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}",
+                        topic, ledgerId, entryId, Long.toHexString(checksum), Integer.toHexString(computedChecksum));
                 return false;
             }
         }
@@ -129,15 +112,15 @@ public class MessageParser {
         return true;
     }
 
-    public static ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata,
-            ByteBuf payload, String topic, String subscription) {
+    public static ByteBuf uncompressPayloadIfNeeded(TopicName topic, MessageMetadata msgMetadata,
+            ByteBuf payload, long ledgerId, long entryId) {
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression());
         int uncompressedSize = msgMetadata.getUncompressedSize();
         int payloadSize = payload.readableBytes();
         if (payloadSize > PulsarDecoder.MaxMessageSize) {
             // payload size is itself corrupted since it cannot be bigger than the MaxMessageSize
-            log.error("[{}][{}] Got corrupted payload message size {} at {}", topic, subscription, payloadSize,
-                    messageId);
+            log.error("[{}] Got corrupted payload message size {} at {}:{}", topic, payloadSize,
+                    ledgerId, entryId);
             return null;
         }
 
@@ -145,15 +128,14 @@ public class MessageParser {
             ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
             return uncompressedPayload;
         } catch (IOException e) {
-            log.error("[{}][{}] Failed to decompress message with {} at {}: {}", topic, subscription,
-                    msgMetadata.getCompression(), messageId, e.getMessage(), e);
+            log.error("[{}] Failed to decompress message with {} at {}:{} : {}", topic,
+                    msgMetadata.getCompression(), ledgerId, entryId, e.getMessage(), e);
             return null;
         }
     }
 
-    public static void receiveIndividualMessagesFromBatch(String topic, MessageMetadata msgMetadata,
-            ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx,
-            int partitionIndex, MessageProcessor processor) {
+    private static void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata,
+            ByteBuf uncompressedPayload, long ledgerId, long entryId, MessageProcessor processor) {
         int batchSize = msgMetadata.getNumMessagesInBatch();
 
         try {
@@ -167,20 +149,11 @@ public class MessageParser {
                     // message has been compacted out, so don't send to the user
                     singleMessagePayload.release();
                     singleMessageMetadataBuilder.recycle();
-
                     continue;
                 }
 
-                BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
-                        messageId.getEntryId(), partitionIndex, i, null);
-                final MessageImpl<?> message = new MessageImpl<>(
-                        topic, batchMessageIdImpl, msgMetadata,
-                        singleMessageMetadataBuilder.build(), singleMessagePayload, Optional.empty(), cnx, null);
-
-                processor.process(batchMessageIdImpl, message, singleMessagePayload);
-
-                singleMessagePayload.release();
-                singleMessageMetadataBuilder.recycle();
+                processor.process(RawMessageImpl.get(msgMetadata, singleMessageMetadataBuilder, singleMessagePayload,
+                        ledgerId, entryId, i));
             }
         } catch (IOException e) {
             log.warn("Unable to obtain messages in batch", e);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
new file mode 100644
index 0000000..512dfef
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api.raw;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * View of a message that exposes the internal direct-memory buffer for more efficient processing.
+ *
+ * The message needs to be released when the processing is done.
+ */
+public interface RawMessage {
+
+    /**
+     * Release all the resources associated with this raw message
+     */
+    void release();
+
+    /**
+     * Return the properties attached to the message.
+     *
+     * Properties are application defined key/value pairs that will be attached to the message
+     *
+     * @return an unmodifiable view of the properties map
+     */
+    Map<String, String> getProperties();
+
+    /**
+     * Get the content of the message
+     *
+     * @return the byte array with the message payload
+     */
+    ByteBuf getData();
+
+    /**
+     * Get the unique message ID associated with this message.
+     *
+     * The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
+     *
+     * Only messages received from the consumer will have a message id assigned.
+     *
+     * @return the message id null if this message was not received by this client instance
+     */
+    RawMessageId getMessageId();
+
+    /**
+     * Get the publish time of this message. The publish time is the timestamp that a client publish the message.
+     *
+     * @return publish time of this message.
+     * @see #getEventTime()
+     */
+    long getPublishTime();
+
+    /**
+     * Get the event time associated with this message. It is typically set by the applications via
+     * {@link MessageBuilder#setEventTime(long)}.
+     *
+     * <p>
+     * If there isn't any event time associated with this event, it will return 0.
+     */
+    long getEventTime();
+
+    /**
+     * Get the sequence id associated with this message. It is typically set by the applications via
+     * {@link MessageBuilder#setSequenceId(long)}.
+     *
+     * @return sequence id associated with this message.
+     * @see MessageBuilder#setEventTime(long)
+     */
+    long getSequenceId();
+
+    /**
+     * Get the producer name who produced this message.
+     *
+     * @return producer name who produced this message, null if producer name is not set.
+     */
+    String getProducerName();
+
+    /**
+     * Get the key of the message
+     *
+     * @return the key of the message
+     */
+    Optional<String> getKey();
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
similarity index 82%
copy from pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
index 6727b29..1151443 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
@@ -16,12 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.sql.presto;
+package org.apache.pulsar.common.api.raw;
 
-public interface SchemaHandler {
-
-    Object deserialize(byte[] bytes);
-
-    Object extractField(int index, Object currentRecord);
+public interface RawMessageId {
 
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java
similarity index 62%
copy from pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java
index 6727b29..d388cc7 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java
@@ -16,12 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.sql.presto;
+package org.apache.pulsar.common.api.raw;
 
-public interface SchemaHandler {
+public class RawMessageIdImpl implements RawMessageId {
 
-    Object deserialize(byte[] bytes);
-
-    Object extractField(int index, Object currentRecord);
+    long ledgerId;
+    long entryId;
+    long batchIndex;
 
+    @Override
+    public String toString() {
+        return new StringBuilder()
+                .append('(')
+                .append(ledgerId)
+                .append(',')
+                .append(entryId)
+                .append(',')
+                .append(batchIndex)
+                .append(')')
+                .toString();
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
new file mode 100644
index 0000000..6915124
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api.raw;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+
+public class RawMessageImpl implements RawMessage {
+
+    private final RawMessageIdImpl messageId = new RawMessageIdImpl();
+
+    private MessageMetadata msgMetadata;
+    private PulsarApi.SingleMessageMetadata.Builder singleMessageMetadata;
+    private ByteBuf payload;
+
+    private static final Recycler<RawMessageImpl> RECYCLER = new Recycler<RawMessageImpl>() {
+        @Override
+        protected RawMessageImpl newObject(Handle<RawMessageImpl> handle) {
+            return new RawMessageImpl(handle);
+        }
+    };
+
+    private final Handle<RawMessageImpl> handle;
+
+    private RawMessageImpl(Handle<RawMessageImpl> handle) {
+        this.handle = handle;
+    }
+
+    @Override
+    public void release() {
+        if (singleMessageMetadata != null) {
+            singleMessageMetadata.recycle();
+            singleMessageMetadata = null;
+        }
+
+        payload.release();
+        handle.recycle(this);
+    }
+
+    public static RawMessage get(MessageMetadata msgMetadata,
+            PulsarApi.SingleMessageMetadata.Builder singleMessageMetadata,
+            ByteBuf payload,
+            long ledgerId, long entryId, long batchIndex) {
+        RawMessageImpl msg = RECYCLER.get();
+        msg.msgMetadata = msgMetadata;
+        msg.singleMessageMetadata = singleMessageMetadata;
+        msg.messageId.ledgerId = ledgerId;
+        msg.messageId.entryId = entryId;
+        msg.messageId.batchIndex = batchIndex;
+        msg.payload = payload;
+        return msg;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
+            return singleMessageMetadata.getPropertiesList().stream()
+                    .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
+        } else if (msgMetadata.getPropertiesCount() > 0) {
+            return msgMetadata.getPropertiesList().stream()
+                    .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
+        } else {
+            return Collections.emptyMap();
+        }
+    }
+
+    @Override
+    public ByteBuf getData() {
+        return payload;
+    }
+
+    @Override
+    public RawMessageId getMessageId() {
+        return messageId;
+    }
+
+    @Override
+    public long getPublishTime() {
+        return msgMetadata.getPublishTime();
+    }
+
+    @Override
+    public long getEventTime() {
+        if (singleMessageMetadata != null && singleMessageMetadata.hasEventTime()) {
+            return singleMessageMetadata.getEventTime();
+        } else if (msgMetadata.hasEventTime()) {
+            return msgMetadata.getEventTime();
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public long getSequenceId() {
+        return msgMetadata.getSequenceId() + messageId.batchIndex;
+    }
+
+    @Override
+    public String getProducerName() {
+        return msgMetadata.getProducerName();
+    }
+
+    @Override
+    public Optional<String> getKey() {
+        if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) {
+            return Optional.of(singleMessageMetadata.getPartitionKey());
+        } else if (msgMetadata.hasPartitionKey()){
+            return Optional.of(msgMetadata.getPartitionKey());
+        } else {
+            return Optional.empty();
+        }
+    }
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index eebfdbe..ed3759a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -19,6 +19,10 @@
 package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
+
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBufAllocator;
+import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
 import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
@@ -47,16 +51,27 @@ public class AvroSchemaHandler implements SchemaHandler {
     }
 
     @Override
-    public Object deserialize(byte[] bytes) {
+    public Object deserialize(ByteBuf payload) {
+
+        ByteBuf heapBuffer = null;
         try {
             BinaryDecoder decoderFromCache = decoders.get();
-            BinaryDecoder decoder=DecoderFactory.get().binaryDecoder(bytes, decoderFromCache);
+
+            // Make a copy into a heap buffer, since Avro cannot deserialize directly from direct memory
+            int size = payload.readableBytes();
+            heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(size, size);
+            heapBuffer.writeBytes(payload);
+
+            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
+                    heapBuffer.readableBytes(), decoderFromCache);
             if (decoderFromCache==null) {
                 decoders.set(decoder);
             }
             return this.datumReader.read(null, decoder);
         } catch (IOException e) {
             log.error(e);
+        } finally {
+            ReferenceCountUtil.safeRelease(heapBuffer);
         }
         return null;
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
index c941e58..ae1a7c4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.sql.presto;
 
 import com.dslplatform.json.DslJson;
 import com.facebook.presto.spi.type.Type;
+
 import io.airlift.log.Logger;
 
 import java.io.IOException;
@@ -27,6 +28,9 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
+
 public class JSONSchemaHandler implements SchemaHandler {
 
     private static final Logger log = Logger.get(JSONSchemaHandler.class);
@@ -35,24 +39,43 @@ public class JSONSchemaHandler implements SchemaHandler {
 
     private final DslJson<Object> dslJson = new DslJson<>();
 
+    private static final FastThreadLocal<byte[]> tmpBuffer = new FastThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+            return new byte[1024];
+        }
+    };
+
     public JSONSchemaHandler(List<PulsarColumnHandle> columnHandles) {
         this.columnHandles = columnHandles;
     }
 
     @Override
-    public Object deserialize(byte[] bytes) {
+    public Object deserialize(ByteBuf payload) {
+        // Since JSON deserializer only works on a byte[] we need to convert a direct mem buffer into
+        // a byte[].
+        int size = payload.readableBytes();
+        byte[] buffer = tmpBuffer.get();
+        if (buffer.length < size) {
+            // If the thread-local buffer is not big enough, replace it with
+            // a bigger one
+            buffer = new byte[size * 2];
+            tmpBuffer.set(buffer);
+        }
+
+        payload.readBytes(buffer, 0, size);
+
         try {
-            return dslJson.deserialize(Map.class, bytes, bytes.length);
+            return dslJson.deserialize(Map.class, buffer, size);
         } catch (IOException e) {
-            log.error(e);
+            log.error("Failed to deserialize Json object", e);
+            return null;
         }
-        return null;
     }
 
     @Override
     public Object extractField(int index, Object currentRecord) {
         try {
-
             Map jsonObject = (Map) currentRecord;
             PulsarColumnHandle pulsarColumnHandle = columnHandles.get(index);
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index 76da585..ca74e39 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.api.raw.RawMessage;
 
 import java.util.Map;
 import java.util.Set;
@@ -46,7 +46,7 @@ public abstract class PulsarInternalColumn {
         }
 
         @Override
-        public Object getData(Message message) {
+        public Object getData(RawMessage message) {
             return message.getEventTime() == 0 ? null : message.getEventTime();
         }
     }
@@ -58,7 +58,7 @@ public abstract class PulsarInternalColumn {
         }
 
         @Override
-        public Object getData(Message message) {
+        public Object getData(RawMessage message) {
             return message.getPublishTime();
         }
     }
@@ -70,7 +70,7 @@ public abstract class PulsarInternalColumn {
         }
 
         @Override
-        public Object getData(Message message) {
+        public Object getData(RawMessage message) {
             return message.getMessageId().toString();
         }
     }
@@ -82,7 +82,7 @@ public abstract class PulsarInternalColumn {
         }
 
         @Override
-        public Object getData(Message message) {
+        public Object getData(RawMessage message) {
             return message.getSequenceId();
         }
     }
@@ -94,7 +94,7 @@ public abstract class PulsarInternalColumn {
         }
 
         @Override
-        public Object getData(Message message) {
+        public Object getData(RawMessage message) {
             return message.getProducerName();
         }
     }
@@ -106,8 +106,8 @@ public abstract class PulsarInternalColumn {
         }
 
         @Override
-        public Object getData(Message message) {
-            return message.hasKey() ? message.getKey() : null;
+        public Object getData(RawMessage message) {
+            return message.getKey().orElse(null);
         }
     }
 
@@ -121,7 +121,7 @@ public abstract class PulsarInternalColumn {
         }
 
         @Override
-        public Object getData(Message message) {
+        public Object getData(RawMessage message) {
             try {
                 return mapper.writeValueAsString(message.getProperties());
             } catch (JsonProcessingException e) {
@@ -201,5 +201,5 @@ public abstract class PulsarInternalColumn {
         return builder.build();
     }
 
-    public abstract Object getData(Message message);
+    public abstract Object getData(RawMessage message);
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index e2198a8..9427f20 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -18,15 +18,35 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
+import static com.facebook.presto.spi.type.BigintType.BIGINT;
+import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
+import static com.facebook.presto.spi.type.DateType.DATE;
+import static com.facebook.presto.spi.type.IntegerType.INTEGER;
+import static com.facebook.presto.spi.type.RealType.REAL;
+import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
+import static com.facebook.presto.spi.type.TimeType.TIME;
+import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
+import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
+import static com.facebook.presto.spi.type.TinyintType.TINYINT;
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.RecordCursor;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarbinaryType;
 import com.facebook.presto.spi.type.VarcharType;
 import com.google.common.annotations.VisibleForTesting;
+
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
 import io.airlift.slice.Slices;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.avro.Schema;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -36,32 +56,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.impl.MessageParser;
+import org.apache.pulsar.common.api.raw.MessageParser;
+import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
-import static com.facebook.presto.spi.type.BigintType.BIGINT;
-import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
-import static com.facebook.presto.spi.type.DateType.DATE;
-import static com.facebook.presto.spi.type.IntegerType.INTEGER;
-import static com.facebook.presto.spi.type.RealType.REAL;
-import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
-import static com.facebook.presto.spi.type.TimeType.TIME;
-import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
-import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
-import static com.facebook.presto.spi.type.TinyintType.TINYINT;
-import static com.google.common.base.Preconditions.checkArgument;
-
 
 public class PulsarRecordCursor implements RecordCursor {
 
@@ -69,10 +71,10 @@ public class PulsarRecordCursor implements RecordCursor {
     private PulsarSplit pulsarSplit;
     private PulsarConnectorConfig pulsarConnectorConfig;
     private ReadOnlyCursor cursor;
-    private SpscArrayQueue<Message> messageQueue;
+    private SpscArrayQueue<RawMessage> messageQueue;
     private SpscArrayQueue<Entry> entryQueue;
     private Object currentRecord;
-    private Message currentMessage;
+    private RawMessage currentMessage;
     private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap();
     private SchemaHandler schemaHandler;
     private int maxBatchSize;
@@ -126,8 +128,8 @@ public class PulsarRecordCursor implements RecordCursor {
         this.pulsarSplit = pulsarSplit;
         this.pulsarConnectorConfig = pulsarConnectorConfig;
         this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
-        this.messageQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
-        this.entryQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
+        this.messageQueue = new SpscArrayQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
+        this.entryQueue = new SpscArrayQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
         this.topicName = TopicName.get("persistent",
                 NamespaceName.get(pulsarSplit.getSchemaName()),
                 pulsarSplit.getTableName());
@@ -236,7 +238,7 @@ public class PulsarRecordCursor implements RecordCursor {
 
                             try {
                                 MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
-                                        entry.getDataBuffer(), (messageId, message, byteBuf) -> {
+                                        entry.getDataBuffer(), (message) -> {
                                             try {
                                                 // start time for message queue read
                                                 metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
@@ -339,7 +341,7 @@ public class PulsarRecordCursor implements RecordCursor {
             metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size());
         }
 
-        public boolean hashFinished() {
+        public boolean hasFinished() {
             return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >=1 && splitSize <= entriesProcessed;
         }
 
@@ -355,7 +357,6 @@ public class PulsarRecordCursor implements RecordCursor {
         }
     }
 
-
     @Override
     public boolean advanceNextPosition() {
 
@@ -368,8 +369,13 @@ public class PulsarRecordCursor implements RecordCursor {
             readEntries.run();
         }
 
+        if (currentMessage != null) {
+            currentMessage.release();
+            currentMessage = null;
+        }
+
         while(true) {
-            if (readEntries.hashFinished()) {
+            if (readEntries.hasFinished()) {
                 return false;
             }
 
@@ -495,6 +501,14 @@ public class PulsarRecordCursor implements RecordCursor {
 
     @Override
     public void close() {
+        log.info("Closing cursor record");
+
+        if (currentMessage != null) {
+            currentMessage.release();
+        }
+
+        messageQueue.drain(RawMessage::release);
+        entryQueue.drain(Entry::release);
 
         if (deserializeEntries != null) {
             deserializeEntries.interrupt();
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 4bd1171..e5688ae 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -47,6 +47,9 @@ public class PulsarSplit implements ConnectorSplit {
     private final long endPositionLedgerId;
     private final TupleDomain<ColumnHandle> tupleDomain;
 
+    private final PositionImpl startPosition;
+    private final PositionImpl endPosition;
+
     @JsonCreator
     public PulsarSplit(
             @JsonProperty("splitId") long splitId,
@@ -73,6 +76,8 @@ public class PulsarSplit implements ConnectorSplit {
         this.startPositionLedgerId = startPositionLedgerId;
         this.endPositionLedgerId = endPositionLedgerId;
         this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
+        this.startPosition = PositionImpl.get(startPositionLedgerId, startPositionEntryId);
+        this.endPosition = PositionImpl.get(endPositionLedgerId, endPositionEntryId);
     }
 
     @JsonProperty
@@ -136,11 +141,11 @@ public class PulsarSplit implements ConnectorSplit {
     }
 
     public PositionImpl getStartPosition() {
-        return PositionImpl.get(startPositionLedgerId, startPositionEntryId);
+        return startPosition;
     }
 
     public PositionImpl getEndPosition() {
-        return PositionImpl.get(endPositionLedgerId, endPositionEntryId);
+        return endPosition;
     }
 
     @Override
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
index 6727b29..3cabc8a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+
 public interface SchemaHandler {
 
-    Object deserialize(byte[] bytes);
+    Object deserialize(ByteBuf payload);
 
     Object extractField(int index, Object currentRecord);