You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/10 19:09:17 UTC
[pulsar] branch master updated: Use raw message when manually
parsing messages from topic storage (#3146)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bddfa2a Use raw message when manually parsing messages from topic storage (#3146)
bddfa2a is described below
commit bddfa2a5a233f17b93cb756b289b40d587321e4c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Dec 10 11:09:12 2018 -0800
Use raw message when manually parsing messages from topic storage (#3146)
* Use raw message when manually parsing messages from topic storage
* Added missing headers
* Fixed copy to byte[]
---
.../pulsar/client/impl/MessageParserTest.java | 30 ++---
.../pulsar/common/api/raw}/MessageParser.java | 71 ++++-------
.../apache/pulsar/common/api/raw/RawMessage.java | 104 +++++++++++++++
.../apache/pulsar/common/api/raw/RawMessageId.java | 8 +-
.../pulsar/common/api/raw/RawMessageIdImpl.java | 22 +++-
.../pulsar/common/api/raw/RawMessageImpl.java | 139 +++++++++++++++++++++
.../pulsar/sql/presto/AvroSchemaHandler.java | 19 ++-
.../pulsar/sql/presto/JSONSchemaHandler.java | 33 ++++-
.../pulsar/sql/presto/PulsarInternalColumn.java | 20 +--
.../pulsar/sql/presto/PulsarRecordCursor.java | 70 ++++++-----
.../org/apache/pulsar/sql/presto/PulsarSplit.java | 9 +-
.../apache/pulsar/sql/presto/SchemaHandler.java | 4 +-
12 files changed, 404 insertions(+), 125 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
index a16166a..5ec6332 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -22,7 +22,7 @@ import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -32,12 +32,10 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.raw.MessageParser;
+import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -66,12 +64,6 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
super.internalCleanup();
}
- public static String extractKey(RawMessage m) throws Exception {
- ByteBuf headersAndPayload = m.getHeadersAndPayload();
- MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
- return msgMetadata.getPartitionKey();
- }
-
@Test
public void testWithoutBatches() throws Exception {
String topic = "persistent://my-tenant/my-ns/my-topic";
@@ -93,11 +85,11 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
for (int i = 0; i < n; i++) {
Entry entry = cursor.readEntriesOrWait(1).get(0);
- List<Message<?>> messages = Lists.newArrayList();
+ List<RawMessage> messages = Lists.newArrayList();
try {
MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer(),
- (messageId, message, payload) -> {
+ (message) -> {
messages.add(message);
});
} finally {
@@ -106,7 +98,9 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
assertEquals(messages.size(), 1);
- assertEquals(messages.get(0).getData(), ("hello-" + i).getBytes());
+ assertEquals(messages.get(0).getData(), Unpooled.wrappedBuffer(("hello-" + i).getBytes()));
+
+ messages.forEach(RawMessage::release);
}
}
@@ -133,11 +127,11 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
Entry entry = cursor.readEntriesOrWait(1).get(0);
- List<Message<?>> messages = Lists.newArrayList();
+ List<RawMessage> messages = Lists.newArrayList();
try {
MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer(),
- (messageId, message, payload) -> {
+ (message) -> {
messages.add(message);
});
} finally {
@@ -147,9 +141,11 @@ public class MessageParserTest extends MockedPulsarServiceBaseTest {
assertEquals(messages.size(), 10);
for (int i = 0; i < n; i++) {
- assertEquals(messages.get(i).getData(), ("hello-" + i).getBytes());
+ assertEquals(messages.get(i).getData(), Unpooled.wrappedBuffer(("hello-" + i).getBytes()));
}
+ messages.forEach(RawMessage::release);
+
producer.close();
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
similarity index 61%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index d68ce75..1280837 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.api.raw;
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static org.apache.pulsar.common.api.Commands.hasChecksum;
@@ -25,17 +25,13 @@ import static org.apache.pulsar.common.api.Commands.readChecksum;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.util.Optional;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
@@ -45,7 +41,7 @@ import org.apache.pulsar.common.naming.TopicName;
@Slf4j
public class MessageParser {
public interface MessageProcessor {
- void process(MessageId messageId, Message<?> message, ByteBuf payload);
+ void process(RawMessage message);
}
/**
@@ -54,19 +50,12 @@ public class MessageParser {
*/
public static void parseMessage(TopicName topicName, long ledgerId, long entryId, ByteBuf headersAndPayload,
MessageProcessor processor) throws IOException {
- MessageIdImpl msgId = new MessageIdImpl(ledgerId, entryId, -1);
-
- MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
- messageIdBuilder.setLedgerId(ledgerId);
- messageIdBuilder.setEntryId(entryId);
- MessageIdData messageId = messageIdBuilder.build();
-
MessageMetadata msgMetadata = null;
ByteBuf payload = headersAndPayload;
ByteBuf uncompressedPayload = null;
try {
- if (!verifyChecksum(headersAndPayload, messageId, topicName.toString(), "reader")) {
+ if (!verifyChecksum(topicName, headersAndPayload, ledgerId, entryId)) {
// discard message with checksum error
return;
}
@@ -74,7 +63,7 @@ public class MessageParser {
try {
msgMetadata = Commands.parseMessageMetadata(payload);
} catch (Throwable t) {
- log.warn("[{}] Failed to deserialize metadata for message {} - Ignoring", topicName, messageId);
+ log.warn("[{}] Failed to deserialize metadata for message {}:{} - Ignoring", topicName, ledgerId, entryId);
return;
}
@@ -82,8 +71,8 @@ public class MessageParser {
throw new IOException("Cannot parse encrypted message " + msgMetadata + " on topic " + topicName);
}
- uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, headersAndPayload,
- topicName.toString(), "reader");
+ uncompressedPayload = uncompressPayloadIfNeeded(topicName, msgMetadata, headersAndPayload, ledgerId,
+ entryId);
if (uncompressedPayload == null) {
// Message was discarded on decompression error
@@ -93,35 +82,29 @@ public class MessageParser {
final int numMessages = msgMetadata.getNumMessagesInBatch();
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
- final MessageImpl<?> message = new MessageImpl<>(topicName.toString(),
- msgId, msgMetadata, uncompressedPayload,
- null, null);
- processor.process(msgId, message, uncompressedPayload);
+
+ processor.process(RawMessageImpl.get(msgMetadata, null, uncompressedPayload, ledgerId, entryId, 0));
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
- receiveIndividualMessagesFromBatch(topicName.toString(), msgMetadata, uncompressedPayload, messageId, null, -1, processor);
+ receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, ledgerId, entryId, processor);
}
} finally {
if (uncompressedPayload != null) {
uncompressedPayload.release();
}
- messageIdBuilder.recycle();
- messageId.recycle();
msgMetadata.recycle();
}
}
- public static boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId, String topic,
- String subscription) {
+ public static boolean verifyChecksum(TopicName topic, ByteBuf headersAndPayload, long ledgerId, long entryId) {
if (hasChecksum(headersAndPayload)) {
int checksum = readChecksum(headersAndPayload);
int computedChecksum = computeChecksum(headersAndPayload);
if (checksum != computedChecksum) {
log.error(
- "[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}",
- topic, subscription, messageId.getLedgerId(), messageId.getEntryId(),
- Long.toHexString(checksum), Integer.toHexString(computedChecksum));
+ "[{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}",
+ topic, ledgerId, entryId, Long.toHexString(checksum), Integer.toHexString(computedChecksum));
return false;
}
}
@@ -129,15 +112,15 @@ public class MessageParser {
return true;
}
- public static ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata,
- ByteBuf payload, String topic, String subscription) {
+ public static ByteBuf uncompressPayloadIfNeeded(TopicName topic, MessageMetadata msgMetadata,
+ ByteBuf payload, long ledgerId, long entryId) {
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression());
int uncompressedSize = msgMetadata.getUncompressedSize();
int payloadSize = payload.readableBytes();
if (payloadSize > PulsarDecoder.MaxMessageSize) {
// payload size is itself corrupted since it cannot be bigger than the MaxMessageSize
- log.error("[{}][{}] Got corrupted payload message size {} at {}", topic, subscription, payloadSize,
- messageId);
+ log.error("[{}] Got corrupted payload message size {} at {}:{}", topic, payloadSize,
+ ledgerId, entryId);
return null;
}
@@ -145,15 +128,14 @@ public class MessageParser {
ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
return uncompressedPayload;
} catch (IOException e) {
- log.error("[{}][{}] Failed to decompress message with {} at {}: {}", topic, subscription,
- msgMetadata.getCompression(), messageId, e.getMessage(), e);
+ log.error("[{}] Failed to decompress message with {} at {}:{} : {}", topic,
+ msgMetadata.getCompression(), ledgerId, entryId, e.getMessage(), e);
return null;
}
}
- public static void receiveIndividualMessagesFromBatch(String topic, MessageMetadata msgMetadata,
- ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx,
- int partitionIndex, MessageProcessor processor) {
+ private static void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata,
+ ByteBuf uncompressedPayload, long ledgerId, long entryId, MessageProcessor processor) {
int batchSize = msgMetadata.getNumMessagesInBatch();
try {
@@ -167,20 +149,11 @@ public class MessageParser {
// message has been compacted out, so don't send to the user
singleMessagePayload.release();
singleMessageMetadataBuilder.recycle();
-
continue;
}
- BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
- messageId.getEntryId(), partitionIndex, i, null);
- final MessageImpl<?> message = new MessageImpl<>(
- topic, batchMessageIdImpl, msgMetadata,
- singleMessageMetadataBuilder.build(), singleMessagePayload, Optional.empty(), cnx, null);
-
- processor.process(batchMessageIdImpl, message, singleMessagePayload);
-
- singleMessagePayload.release();
- singleMessageMetadataBuilder.recycle();
+ processor.process(RawMessageImpl.get(msgMetadata, singleMessageMetadataBuilder, singleMessagePayload,
+ ledgerId, entryId, i));
}
} catch (IOException e) {
log.warn("Unable to obtain messages in batch", e);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
new file mode 100644
index 0000000..512dfef
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api.raw;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * View of a message that exposes the internal direct-memory buffer for more efficient processing.
+ *
+ * The message needs to be released when the processing is done.
+ */
+public interface RawMessage {
+
+ /**
+ * Release all the resources associated with this raw message
+ */
+ void release();
+
+ /**
+ * Return the properties attached to the message.
+ *
+ * Properties are application defined key/value pairs that will be attached to the message
+ *
+ * @return an unmodifiable view of the properties map
+ */
+ Map<String, String> getProperties();
+
+ /**
+ * Get the content of the message
+ *
+ * @return the byte array with the message payload
+ */
+ ByteBuf getData();
+
+ /**
+ * Get the unique message ID associated with this message.
+ *
+ * The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
+ *
+ * Only messages received from the consumer will have a message id assigned.
+ *
+ * @return the message id null if this message was not received by this client instance
+ */
+ RawMessageId getMessageId();
+
+ /**
+ * Get the publish time of this message. The publish time is the timestamp that a client publish the message.
+ *
+ * @return publish time of this message.
+ * @see #getEventTime()
+ */
+ long getPublishTime();
+
+ /**
+ * Get the event time associated with this message. It is typically set by the applications via
+ * {@link MessageBuilder#setEventTime(long)}.
+ *
+ * <p>
+ * If there isn't any event time associated with this event, it will return 0.
+ */
+ long getEventTime();
+
+ /**
+ * Get the sequence id associated with this message. It is typically set by the applications via
+ * {@link MessageBuilder#setSequenceId(long)}.
+ *
+ * @return sequence id associated with this message.
+ * @see MessageBuilder#setEventTime(long)
+ */
+ long getSequenceId();
+
+ /**
+ * Get the producer name who produced this message.
+ *
+ * @return producer name who produced this message, null if producer name is not set.
+ */
+ String getProducerName();
+
+ /**
+ * Get the key of the message
+ *
+ * @return the key of the message
+ */
+ Optional<String> getKey();
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
similarity index 82%
copy from pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
index 6727b29..1151443 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageId.java
@@ -16,12 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.sql.presto;
+package org.apache.pulsar.common.api.raw;
-public interface SchemaHandler {
-
- Object deserialize(byte[] bytes);
-
- Object extractField(int index, Object currentRecord);
+public interface RawMessageId {
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java
similarity index 62%
copy from pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java
index 6727b29..d388cc7 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageIdImpl.java
@@ -16,12 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.sql.presto;
+package org.apache.pulsar.common.api.raw;
-public interface SchemaHandler {
+public class RawMessageIdImpl implements RawMessageId {
- Object deserialize(byte[] bytes);
-
- Object extractField(int index, Object currentRecord);
+ long ledgerId;
+ long entryId;
+ long batchIndex;
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append('(')
+ .append(ledgerId)
+ .append(',')
+ .append(entryId)
+ .append(',')
+ .append(batchIndex)
+ .append(')')
+ .toString();
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
new file mode 100644
index 0000000..6915124
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api.raw;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+
+public class RawMessageImpl implements RawMessage {
+
+ private final RawMessageIdImpl messageId = new RawMessageIdImpl();
+
+ private MessageMetadata msgMetadata;
+ private PulsarApi.SingleMessageMetadata.Builder singleMessageMetadata;
+ private ByteBuf payload;
+
+ private static final Recycler<RawMessageImpl> RECYCLER = new Recycler<RawMessageImpl>() {
+ @Override
+ protected RawMessageImpl newObject(Handle<RawMessageImpl> handle) {
+ return new RawMessageImpl(handle);
+ }
+ };
+
+ private final Handle<RawMessageImpl> handle;
+
+ private RawMessageImpl(Handle<RawMessageImpl> handle) {
+ this.handle = handle;
+ }
+
+ @Override
+ public void release() {
+ if (singleMessageMetadata != null) {
+ singleMessageMetadata.recycle();
+ singleMessageMetadata = null;
+ }
+
+ payload.release();
+ handle.recycle(this);
+ }
+
+ public static RawMessage get(MessageMetadata msgMetadata,
+ PulsarApi.SingleMessageMetadata.Builder singleMessageMetadata,
+ ByteBuf payload,
+ long ledgerId, long entryId, long batchIndex) {
+ RawMessageImpl msg = RECYCLER.get();
+ msg.msgMetadata = msgMetadata;
+ msg.singleMessageMetadata = singleMessageMetadata;
+ msg.messageId.ledgerId = ledgerId;
+ msg.messageId.entryId = entryId;
+ msg.messageId.batchIndex = batchIndex;
+ msg.payload = payload;
+ return msg;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
+ return singleMessageMetadata.getPropertiesList().stream()
+ .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
+ } else if (msgMetadata.getPropertiesCount() > 0) {
+ return msgMetadata.getPropertiesList().stream()
+ .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
+ @Override
+ public ByteBuf getData() {
+ return payload;
+ }
+
+ @Override
+ public RawMessageId getMessageId() {
+ return messageId;
+ }
+
+ @Override
+ public long getPublishTime() {
+ return msgMetadata.getPublishTime();
+ }
+
+ @Override
+ public long getEventTime() {
+ if (singleMessageMetadata != null && singleMessageMetadata.hasEventTime()) {
+ return singleMessageMetadata.getEventTime();
+ } else if (msgMetadata.hasEventTime()) {
+ return msgMetadata.getEventTime();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public long getSequenceId() {
+ return msgMetadata.getSequenceId() + messageId.batchIndex;
+ }
+
+ @Override
+ public String getProducerName() {
+ return msgMetadata.getProducerName();
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) {
+ return Optional.of(singleMessageMetadata.getPartitionKey());
+ } else if (msgMetadata.hasPartitionKey()){
+ return Optional.of(msgMetadata.getPartitionKey());
+ } else {
+ return Optional.empty();
+ }
+ }
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index eebfdbe..ed3759a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -19,6 +19,10 @@
package org.apache.pulsar.sql.presto;
import io.airlift.log.Logger;
+
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBufAllocator;
+import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
@@ -47,16 +51,27 @@ public class AvroSchemaHandler implements SchemaHandler {
}
@Override
- public Object deserialize(byte[] bytes) {
+ public Object deserialize(ByteBuf payload) {
+
+ ByteBuf heapBuffer = null;
try {
BinaryDecoder decoderFromCache = decoders.get();
- BinaryDecoder decoder=DecoderFactory.get().binaryDecoder(bytes, decoderFromCache);
+
+ // Make a copy into a heap buffer, since Avro cannot deserialize directly from direct memory
+ int size = payload.readableBytes();
+ heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(size, size);
+ heapBuffer.writeBytes(payload);
+
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
+ heapBuffer.readableBytes(), decoderFromCache);
if (decoderFromCache==null) {
decoders.set(decoder);
}
return this.datumReader.read(null, decoder);
} catch (IOException e) {
log.error(e);
+ } finally {
+ ReferenceCountUtil.safeRelease(heapBuffer);
}
return null;
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
index c941e58..ae1a7c4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.sql.presto;
import com.dslplatform.json.DslJson;
import com.facebook.presto.spi.type.Type;
+
import io.airlift.log.Logger;
import java.io.IOException;
@@ -27,6 +28,9 @@ import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
+
public class JSONSchemaHandler implements SchemaHandler {
private static final Logger log = Logger.get(JSONSchemaHandler.class);
@@ -35,24 +39,43 @@ public class JSONSchemaHandler implements SchemaHandler {
private final DslJson<Object> dslJson = new DslJson<>();
+ private static final FastThreadLocal<byte[]> tmpBuffer = new FastThreadLocal<byte[]>() {
+ @Override
+ protected byte[] initialValue() {
+ return new byte[1024];
+ }
+ };
+
public JSONSchemaHandler(List<PulsarColumnHandle> columnHandles) {
this.columnHandles = columnHandles;
}
@Override
- public Object deserialize(byte[] bytes) {
+ public Object deserialize(ByteBuf payload) {
+ // Since JSON deserializer only works on a byte[] we need to convert a direct mem buffer into
+ // a byte[].
+ int size = payload.readableBytes();
+ byte[] buffer = tmpBuffer.get();
+ if (buffer.length < size) {
+ // If the thread-local buffer is not big enough, replace it with
+ // a bigger one
+ buffer = new byte[size * 2];
+ tmpBuffer.set(buffer);
+ }
+
+ payload.readBytes(buffer, 0, size);
+
try {
- return dslJson.deserialize(Map.class, bytes, bytes.length);
+ return dslJson.deserialize(Map.class, buffer, size);
} catch (IOException e) {
- log.error(e);
+ log.error("Failed to deserialize Json object", e);
+ return null;
}
- return null;
}
@Override
public Object extractField(int index, Object currentRecord) {
try {
-
Map jsonObject = (Map) currentRecord;
PulsarColumnHandle pulsarColumnHandle = columnHandles.get(index);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index 76da585..ca74e39 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.api.raw.RawMessage;
import java.util.Map;
import java.util.Set;
@@ -46,7 +46,7 @@ public abstract class PulsarInternalColumn {
}
@Override
- public Object getData(Message message) {
+ public Object getData(RawMessage message) {
return message.getEventTime() == 0 ? null : message.getEventTime();
}
}
@@ -58,7 +58,7 @@ public abstract class PulsarInternalColumn {
}
@Override
- public Object getData(Message message) {
+ public Object getData(RawMessage message) {
return message.getPublishTime();
}
}
@@ -70,7 +70,7 @@ public abstract class PulsarInternalColumn {
}
@Override
- public Object getData(Message message) {
+ public Object getData(RawMessage message) {
return message.getMessageId().toString();
}
}
@@ -82,7 +82,7 @@ public abstract class PulsarInternalColumn {
}
@Override
- public Object getData(Message message) {
+ public Object getData(RawMessage message) {
return message.getSequenceId();
}
}
@@ -94,7 +94,7 @@ public abstract class PulsarInternalColumn {
}
@Override
- public Object getData(Message message) {
+ public Object getData(RawMessage message) {
return message.getProducerName();
}
}
@@ -106,8 +106,8 @@ public abstract class PulsarInternalColumn {
}
@Override
- public Object getData(Message message) {
- return message.hasKey() ? message.getKey() : null;
+ public Object getData(RawMessage message) {
+ return message.getKey().orElse(null);
}
}
@@ -121,7 +121,7 @@ public abstract class PulsarInternalColumn {
}
@Override
- public Object getData(Message message) {
+ public Object getData(RawMessage message) {
try {
return mapper.writeValueAsString(message.getProperties());
} catch (JsonProcessingException e) {
@@ -201,5 +201,5 @@ public abstract class PulsarInternalColumn {
return builder.build();
}
- public abstract Object getData(Message message);
+ public abstract Object getData(RawMessage message);
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index e2198a8..9427f20 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -18,15 +18,35 @@
*/
package org.apache.pulsar.sql.presto;
+import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
+import static com.facebook.presto.spi.type.BigintType.BIGINT;
+import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
+import static com.facebook.presto.spi.type.DateType.DATE;
+import static com.facebook.presto.spi.type.IntegerType.INTEGER;
+import static com.facebook.presto.spi.type.RealType.REAL;
+import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
+import static com.facebook.presto.spi.type.TimeType.TIME;
+import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
+import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
+import static com.facebook.presto.spi.type.TinyintType.TINYINT;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarbinaryType;
import com.facebook.presto.spi.type.VarcharType;
import com.google.common.annotations.VisibleForTesting;
+
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.avro.Schema;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
@@ -36,32 +56,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.impl.MessageParser;
+import org.apache.pulsar.common.api.raw.MessageParser;
+import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaType;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
-import static com.facebook.presto.spi.type.BigintType.BIGINT;
-import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
-import static com.facebook.presto.spi.type.DateType.DATE;
-import static com.facebook.presto.spi.type.IntegerType.INTEGER;
-import static com.facebook.presto.spi.type.RealType.REAL;
-import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
-import static com.facebook.presto.spi.type.TimeType.TIME;
-import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
-import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
-import static com.facebook.presto.spi.type.TinyintType.TINYINT;
-import static com.google.common.base.Preconditions.checkArgument;
-
public class PulsarRecordCursor implements RecordCursor {
@@ -69,10 +71,10 @@ public class PulsarRecordCursor implements RecordCursor {
private PulsarSplit pulsarSplit;
private PulsarConnectorConfig pulsarConnectorConfig;
private ReadOnlyCursor cursor;
- private SpscArrayQueue<Message> messageQueue;
+ private SpscArrayQueue<RawMessage> messageQueue;
private SpscArrayQueue<Entry> entryQueue;
private Object currentRecord;
- private Message currentMessage;
+ private RawMessage currentMessage;
private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap();
private SchemaHandler schemaHandler;
private int maxBatchSize;
@@ -126,8 +128,8 @@ public class PulsarRecordCursor implements RecordCursor {
this.pulsarSplit = pulsarSplit;
this.pulsarConnectorConfig = pulsarConnectorConfig;
this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
- this.messageQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
- this.entryQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
+ this.messageQueue = new SpscArrayQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
+ this.entryQueue = new SpscArrayQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
this.topicName = TopicName.get("persistent",
NamespaceName.get(pulsarSplit.getSchemaName()),
pulsarSplit.getTableName());
@@ -236,7 +238,7 @@ public class PulsarRecordCursor implements RecordCursor {
try {
MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
- entry.getDataBuffer(), (messageId, message, byteBuf) -> {
+ entry.getDataBuffer(), (message) -> {
try {
// start time for message queue read
metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
@@ -339,7 +341,7 @@ public class PulsarRecordCursor implements RecordCursor {
metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size());
}
- public boolean hashFinished() {
+ public boolean hasFinished() {
return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >=1 && splitSize <= entriesProcessed;
}
@@ -355,7 +357,6 @@ public class PulsarRecordCursor implements RecordCursor {
}
}
-
@Override
public boolean advanceNextPosition() {
@@ -368,8 +369,13 @@ public class PulsarRecordCursor implements RecordCursor {
readEntries.run();
}
+ if (currentMessage != null) {
+ currentMessage.release();
+ currentMessage = null;
+ }
+
while(true) {
- if (readEntries.hashFinished()) {
+ if (readEntries.hasFinished()) {
return false;
}
@@ -495,6 +501,14 @@ public class PulsarRecordCursor implements RecordCursor {
@Override
public void close() {
+ log.info("Closing cursor record");
+
+ if (currentMessage != null) {
+ currentMessage.release();
+ }
+
+ messageQueue.drain(RawMessage::release);
+ entryQueue.drain(Entry::release);
if (deserializeEntries != null) {
deserializeEntries.interrupt();
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 4bd1171..e5688ae 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -47,6 +47,9 @@ public class PulsarSplit implements ConnectorSplit {
private final long endPositionLedgerId;
private final TupleDomain<ColumnHandle> tupleDomain;
+ private final PositionImpl startPosition;
+ private final PositionImpl endPosition;
+
@JsonCreator
public PulsarSplit(
@JsonProperty("splitId") long splitId,
@@ -73,6 +76,8 @@ public class PulsarSplit implements ConnectorSplit {
this.startPositionLedgerId = startPositionLedgerId;
this.endPositionLedgerId = endPositionLedgerId;
this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
+ this.startPosition = PositionImpl.get(startPositionLedgerId, startPositionEntryId);
+ this.endPosition = PositionImpl.get(endPositionLedgerId, endPositionEntryId);
}
@JsonProperty
@@ -136,11 +141,11 @@ public class PulsarSplit implements ConnectorSplit {
}
public PositionImpl getStartPosition() {
- return PositionImpl.get(startPositionLedgerId, startPositionEntryId);
+ return startPosition;
}
public PositionImpl getEndPosition() {
- return PositionImpl.get(endPositionLedgerId, endPositionEntryId);
+ return endPosition;
}
@Override
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
index 6727b29..3cabc8a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java
@@ -18,9 +18,11 @@
*/
package org.apache.pulsar.sql.presto;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+
public interface SchemaHandler {
- Object deserialize(byte[] bytes);
+ Object deserialize(ByteBuf payload);
Object extractField(int index, Object currentRecord);