You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/22 00:44:39 UTC
[pinot] branch master updated: Extract record keys, headers and metadata from Pulsar sources (#10995)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe2b013a65 Extract record keys, headers and metadata from Pulsar sources (#10995)
fe2b013a65 is described below
commit fe2b013a657e1ad6ac508a9a37933961bc4c408b
Author: Jeff Bolle <Je...@users.noreply.github.com>
AuthorDate: Fri Jul 21 20:44:33 2023 -0400
Extract record keys, headers and metadata from Pulsar sources (#10995)
---
.../pinot/plugin/stream/pulsar/PulsarConfig.java | 43 ++++-
.../plugin/stream/pulsar/PulsarMessageBatch.java | 51 +-----
.../stream/pulsar/PulsarMetadataExtractor.java | 182 +++++++++++++++++++++
.../PulsarPartitionLevelConnectionHandler.java | 3 +-
.../pulsar/PulsarPartitionLevelConsumer.java | 26 +--
.../stream/pulsar/PulsarStreamLevelConsumer.java | 4 +-
.../plugin/stream/pulsar/PulsarStreamMessage.java | 47 ++++++
.../stream/pulsar/PulsarStreamMessageMetadata.java | 76 +++++++++
.../pinot/plugin/stream/pulsar/PulsarUtils.java | 40 +++++
.../plugin/stream/pulsar/PulsarConfigTest.java | 118 +++++++++++++
.../plugin/stream/pulsar/PulsarConsumerTest.java | 34 ++--
.../stream/pulsar/PulsarMessageBatchTest.java | 36 ++--
.../stream/pulsar/PulsarMetadataExtractorTest.java | 92 +++++++++++
.../pinot/spi/stream/StreamDataDecoderImpl.java | 9 +-
14 files changed, 669 insertions(+), 92 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
index 73ea2eca4b..4094a93f06 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
@@ -19,7 +19,13 @@
package org.apache.pinot.plugin.stream.pulsar;
import com.google.common.base.Preconditions;
+import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
@@ -37,6 +43,7 @@ public class PulsarConfig {
public static final String AUTHENTICATION_TOKEN = "authenticationToken";
public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
public static final String ENABLE_KEY_VALUE_STITCH = "enableKeyValueStitch";
+ public static final String METADATA_FIELDS = "metadataFields"; //list of the metadata fields comma separated
private final String _pulsarTopicName;
private final String _subscriberId;
@@ -45,8 +52,10 @@ public class PulsarConfig {
private final SubscriptionInitialPosition _subscriptionInitialPosition;
private final String _authenticationToken;
private final String _tlsTrustCertsFilePath;
+ @Deprecated(since = "v0.13.* since pulsar supports record key extraction")
private final boolean _enableKeyValueStitch;
-
+ private final boolean _populateMetadata;
+ private final Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> _metadataFields;
public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
_pulsarTopicName = streamConfig.getTopicName();
@@ -71,8 +80,32 @@ public class PulsarConfig {
_subscriptionInitialPosition = PulsarUtils.offsetCriteriaToSubscription(offsetCriteria);
_initialMessageId = PulsarUtils.offsetCriteriaToMessageId(offsetCriteria);
+ _populateMetadata = Boolean.parseBoolean(streamConfig.getStreamConfigsMap().getOrDefault(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+ "false"));
+ String metadataFieldsToExtractCSV = streamConfig.getStreamConfigsMap().getOrDefault(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, METADATA_FIELDS), "");
+ if (StringUtils.isBlank(metadataFieldsToExtractCSV) || !_populateMetadata) {
+ _metadataFields = Collections.emptySet();
+ } else {
+ _metadataFields = parseConfigStringToEnumSet(metadataFieldsToExtractCSV);
+ }
}
+ private Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> parseConfigStringToEnumSet(
+ String listOfMetadataFields) {
+ try {
+ String[] metadataFieldsArr = listOfMetadataFields.split(",");
+ return Stream.of(metadataFieldsArr)
+ .map(String::trim)
+ .filter(StringUtils::isNotBlank)
+ .map(PulsarStreamMessageMetadata.PulsarMessageMetadataValue::findByKey)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid metadata fields list: " + listOfMetadataFields, e);
+ }
+ }
public String getPulsarTopicName() {
return _pulsarTopicName;
}
@@ -100,8 +133,14 @@ public class PulsarConfig {
public String getTlsTrustCertsFilePath() {
return _tlsTrustCertsFilePath;
}
-
public boolean getEnableKeyValueStitch() {
return _enableKeyValueStitch;
}
+ public boolean isPopulateMetadata() {
+ return _populateMetadata;
+ }
+
+ public Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> getMetadataFields() {
+ return _metadataFields;
+ }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
index 6df313b722..912e8bef23 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
@@ -21,16 +21,12 @@ package org.apache.pinot.plugin.stream.pulsar;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -39,13 +35,11 @@ import org.slf4j.LoggerFactory;
* plugins will not work. A custom decoder will be needed to unpack key and value byte arrays and decode
* them independently.
*/
-public class PulsarMessageBatch implements MessageBatch<byte[]> {
- private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMessageBatch.class);
- private final List<Message<byte[]>> _messageList = new ArrayList<>();
- private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4);
+public class PulsarMessageBatch implements MessageBatch<PulsarStreamMessage> {
+ private final List<PulsarStreamMessage> _messageList = new ArrayList<>();
private final boolean _enableKeyValueStitch;
- public PulsarMessageBatch(Iterable<Message<byte[]>> iterable, boolean enableKeyValueStitch) {
+ public PulsarMessageBatch(Iterable<PulsarStreamMessage> iterable, boolean enableKeyValueStitch) {
iterable.forEach(_messageList::add);
_enableKeyValueStitch = enableKeyValueStitch;
}
@@ -56,26 +50,19 @@ public class PulsarMessageBatch implements MessageBatch<byte[]> {
}
@Override
- public byte[] getMessageAtIndex(int index) {
- Message<byte[]> msg = _messageList.get(index);
- if (_enableKeyValueStitch) {
- return stitchKeyValue(msg.getKeyBytes(), msg.getData());
- }
- return msg.getData();
+ public PulsarStreamMessage getMessageAtIndex(int index) {
+ return _messageList.get(index);
}
@Override
public int getMessageOffsetAtIndex(int index) {
- return ByteBuffer.wrap(_messageList.get(index).getData()).arrayOffset();
+ return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset();
}
@Override
public int getMessageLengthAtIndex(int index) {
- if (_enableKeyValueStitch) {
- Message<byte[]> msg = _messageList.get(index);
- return 8 + msg.getKeyBytes().length + msg.getData().length;
- }
- return _messageList.get(index).getData().length;
+ return _messageList.get(index).getValue().length; //if _enableKeyValueStitch is true,
+ // then they are already stitched in the consumer. If false, then the value is the raw value
}
/**
@@ -123,26 +110,4 @@ public class PulsarMessageBatch implements MessageBatch<byte[]> {
public long getNextStreamMessageOffsetAtIndex(int index) {
throw new UnsupportedOperationException("Pulsar does not support long stream offsets");
}
-
- /**
- * Stitch key and value bytes together using a simple format:
- * 4 bytes for key length + key bytes + 4 bytes for value length + value bytes
- */
- private byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
- int keyLen = keyBytes.length;
- int valueLen = valueBytes.length;
- int totalByteArrayLength = 8 + keyLen + valueLen;
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream(totalByteArrayLength)) {
- LENGTH_BUF.clear();
- bos.write(LENGTH_BUF.putInt(keyLen).array());
- bos.write(keyBytes);
- LENGTH_BUF.clear();
- bos.write(LENGTH_BUF.putInt(valueLen).array());
- bos.write(valueBytes);
- return bos.toByteArray();
- } catch (Exception e) {
- LOGGER.error("Unable to stitch key and value bytes together", e);
- }
- return null;
- }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java
new file mode 100644
index 0000000000..c33c32e7cb
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pulsar.client.api.Message;
+
+public interface PulsarMetadataExtractor {
+ static PulsarMetadataExtractor build(boolean populateMetadata,
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataValuesToExtract) {
+ return message -> {
+ long publishTime = message.getPublishTime();
+ long brokerPublishTime = message.getBrokerPublishTime().orElse(0L);
+ long recordTimestamp = brokerPublishTime != 0 ? brokerPublishTime : publishTime;
+
+ Map<String, String> metadataMap = populateMetadataMap(populateMetadata, message, metadataValuesToExtract);
+
+ GenericRow headerGenericRow = populateMetadata ? buildGenericRow(message) : null;
+ return new PulsarStreamMessageMetadata(recordTimestamp, headerGenericRow, metadataMap);
+ };
+ }
+
+ RowMetadata extract(Message<?> record);
+
+ static GenericRow buildGenericRow(Message<?> message) {
+ if (MapUtils.isEmpty(message.getProperties())) {
+ return null;
+ }
+ GenericRow genericRow = new GenericRow();
+ for (Map.Entry<String, String> entry : message.getProperties().entrySet()) {
+ genericRow.putValue(entry.getKey(), entry.getValue());
+ }
+ return genericRow;
+ }
+
+ static Map<String, String> populateMetadataMap(boolean populateAllFields, Message<?> message,
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataValuesToExtract) {
+
+ Map<String, String> metadataMap = new HashMap<>();
+ populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME, message, metadataMap);
+ populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME, message, metadataMap);
+ populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.BROKER_PUBLISH_TIME, message,
+ metadataMap);
+ populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY, message, metadataMap);
+
+ // Populate some timestamps for lag calculation even if populateMetadata is false
+
+ if (!populateAllFields) {
+ return metadataMap;
+ }
+
+ for (PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue : metadataValuesToExtract) {
+ populateMetadataField(metadataValue, message, metadataMap);
+ }
+
+ return metadataMap;
+ }
+
+ private static void populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue value,
+ Message<?> message, Map<String, String> metadataMap) {
+ switch (value) {
+ case PUBLISH_TIME:
+ long publishTime = message.getPublishTime();
+ if (publishTime > 0) {
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME,
+ publishTime);
+ }
+ break;
+ case EVENT_TIME:
+ long eventTime = message.getEventTime();
+ if (eventTime > 0) {
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME,
+ eventTime);
+ }
+ break;
+ case BROKER_PUBLISH_TIME:
+ message.getBrokerPublishTime()
+ .ifPresent(brokerPublishTime -> setMetadataMapField(metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.BROKER_PUBLISH_TIME, brokerPublishTime));
+ break;
+ case MESSAGE_KEY:
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY,
+ message.getKey());
+ break;
+ case MESSAGE_ID:
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID,
+ message.getMessageId().toString());
+ break;
+ case MESSAGE_ID_BYTES_B64:
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64,
+ message.getMessageId().toByteArray());
+ break;
+ case PRODUCER_NAME:
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PRODUCER_NAME,
+ message.getProducerName());
+ break;
+ case SCHEMA_VERSION:
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SCHEMA_VERSION,
+ message.getSchemaVersion());
+ break;
+ case SEQUENCE_ID:
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SEQUENCE_ID,
+ message.getSequenceId());
+ break;
+ case ORDERING_KEY:
+ if (message.hasOrderingKey()) {
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.ORDERING_KEY,
+ message.getOrderingKey());
+ }
+ break;
+ case SIZE:
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SIZE,
+ message.size());
+ break;
+ case TOPIC_NAME:
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.TOPIC_NAME,
+ message.getTopicName());
+ break;
+ case INDEX:
+ message.getIndex().ifPresent(index -> setMetadataMapField(metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.INDEX, index));
+ break;
+ case REDELIVERY_COUNT:
+ setMetadataMapField(metadataMap, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.REDELIVERY_COUNT,
+ message.getRedeliveryCount());
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported metadata value: " + value);
+ }
+ }
+
+ private static void setMetadataMapField(Map<String, String> metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+ String value) {
+ if (StringUtils.isNotBlank(value)) {
+ metadataMap.put(metadataValue.getKey(), value);
+ }
+ }
+
+ private static void setMetadataMapField(Map<String, String> metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+ int value) {
+ setMetadataMapField(metadataMap, metadataValue, String.valueOf(value));
+ }
+
+ private static void setMetadataMapField(Map<String, String> metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+ long value) {
+ setMetadataMapField(metadataMap, metadataValue, String.valueOf(value));
+ }
+
+ private static void setMetadataMapField(Map<String, String> metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+ byte[] value) {
+ if (value != null && value.length > 0) {
+ setMetadataMapField(metadataMap, metadataValue, Base64.getEncoder().encodeToString(value));
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
index 3ad57b55bb..11033ec716 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
@@ -40,6 +40,7 @@ public class PulsarPartitionLevelConnectionHandler {
protected final PulsarConfig _config;
protected final String _clientId;
protected PulsarClient _pulsarClient = null;
+ protected final PulsarMetadataExtractor _pulsarMetadataExtractor;
/**
* Creates a new instance of {@link PulsarClient} and {@link Reader}
@@ -47,7 +48,7 @@ public class PulsarPartitionLevelConnectionHandler {
public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig) {
_config = new PulsarConfig(streamConfig, clientId);
_clientId = clientId;
-
+ _pulsarMetadataExtractor = PulsarMetadataExtractor.build(_config.isPopulateMetadata(), _config.getMetadataFields());
try {
ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(_config.getBootstrapServers());
if (_config.getTlsTrustCertsFilePath() != null) {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
index 0e55a07aa3..d1b80b0360 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -48,15 +47,15 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
private final ExecutorService _executorService;
private final Reader _reader;
- private boolean _enableKeyValueStitch = false;
+ private boolean _enableKeyValueStitch;
public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig,
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
super(clientId, streamConfig);
PulsarConfig config = new PulsarConfig(streamConfig, clientId);
- _reader = createReaderForPartition(config.getPulsarTopicName(),
- partitionGroupConsumptionStatus.getPartitionGroupId(),
- config.getInitialMessageId());
+ _reader =
+ createReaderForPartition(config.getPulsarTopicName(), partitionGroupConsumptionStatus.getPartitionGroupId(),
+ config.getInitialMessageId());
LOGGER.info("Created pulsar reader with id {} for topic {} partition {}", _reader, _config.getPulsarTopicName(),
partitionGroupConsumptionStatus.getPartitionGroupId());
_executorService = Executors.newSingleThreadExecutor();
@@ -64,19 +63,19 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
}
/**
- * Fetch records from the Pulsar stream between the start and end KinesisCheckpoint
+ * Fetch records from the Pulsar stream between the start and end StreamPartitionMsgOffset
* Used {@link org.apache.pulsar.client.api.Reader} to read the messaged from pulsar partitioned topic
* The reader seeks to the startMsgOffset and starts reading records in a loop until endMsgOffset or timeout is
* reached.
*/
@Override
- public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
- int timeoutMillis) {
+ public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+ StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
final MessageId startMessageId = ((MessageIdStreamOffset) startMsgOffset).getMessageId();
final MessageId endMessageId =
endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) endMsgOffset).getMessageId();
- List<Message<byte[]>> messagesList = new ArrayList<>();
+ List<PulsarStreamMessage> messagesList = new ArrayList<>();
Future<PulsarMessageBatch> pulsarResultFuture =
_executorService.submit(() -> fetchMessages(startMessageId, endMessageId, messagesList));
@@ -96,7 +95,7 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
}
public PulsarMessageBatch fetchMessages(MessageId startMessageId, MessageId endMessageId,
- List<Message<byte[]>> messagesList) {
+ List<PulsarStreamMessage> messagesList) {
try {
_reader.seek(startMessageId);
@@ -108,7 +107,8 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
break;
}
}
- messagesList.add(nextMessage);
+ messagesList.add(
+ PulsarUtils.buildPulsarStreamMessage(nextMessage, _enableKeyValueStitch, _pulsarMetadataExtractor));
if (Thread.interrupted()) {
break;
@@ -124,11 +124,11 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
}
}
- private Iterable<Message<byte[]>> buildOffsetFilteringIterable(final List<Message<byte[]>> messageAndOffsets,
+ private Iterable<PulsarStreamMessage> buildOffsetFilteringIterable(final List<PulsarStreamMessage> messageAndOffsets,
final MessageId startOffset, final MessageId endOffset) {
return Iterables.filter(messageAndOffsets, input -> {
// Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
- return input != null && input.getData() != null && (input.getMessageId().compareTo(startOffset) >= 0) && (
+ return input != null && input.getValue() != null && (input.getMessageId().compareTo(startOffset) >= 0) && (
(endOffset == null) || (input.getMessageId().compareTo(endOffset) < 0));
});
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
index 60272c6212..82040f6de3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
@@ -81,9 +81,9 @@ public class PulsarStreamLevelConsumer implements StreamLevelConsumer {
// Log every minute or 100k events
if (now - _lastLogTime > 60000 || _currentCount - _lastCount >= 100000) {
if (_lastCount == 0) {
- _logger.info("Consumed {} events from kafka stream {}", _currentCount, _streamConfig.getTopicName());
+ _logger.info("Consumed {} events from pulsar stream {}", _currentCount, _streamConfig.getTopicName());
} else {
- _logger.info("Consumed {} events from kafka stream {} (rate:{}/s)", _currentCount - _lastCount,
+ _logger.info("Consumed {} events from pulsar stream {} (rate:{}/s)", _currentCount - _lastCount,
_streamConfig.getTopicName(), (float) (_currentCount - _lastCount) * 1000 / (now - _lastLogTime));
}
_lastCount = _currentCount;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java
new file mode 100644
index 0000000000..7e09197857
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pulsar.client.api.MessageId;
+
+public class PulsarStreamMessage extends StreamMessage<byte[]> {
+
+ private final MessageId _messageId;
+ public PulsarStreamMessage(@Nullable byte[] key, byte[] value, MessageId messageId,
+ @Nullable PulsarStreamMessageMetadata metadata, int length) {
+ super(key, value, metadata, length);
+ _messageId = messageId;
+ }
+
+ public MessageId getMessageId() {
+ return _messageId;
+ }
+
+ int getKeyLength() {
+ byte[] key = getKey();
+ return key == null ? 0 : key.length;
+ }
+
+ int getValueLength() {
+ byte[] value = getValue();
+ return value == null ? 0 : value.length;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java
new file mode 100644
index 0000000000..59220138d7
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.EnumSet;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+
+/**
+ * Pulsar specific implementation of {@link StreamMessageMetadata}
+ * Pulsar makes many metadata values available for each message. Please see the pulsar documentation for more details.
+ * @see <a href="https://pulsar.apache.org/docs/en/concepts-messaging/#message-properties">Pulsar Message Properties</a>
+ */
+public class PulsarStreamMessageMetadata extends StreamMessageMetadata {
+
+ public enum PulsarMessageMetadataValue {
+ PUBLISH_TIME("publishTime"),
+ EVENT_TIME("eventTime"),
+ BROKER_PUBLISH_TIME("brokerPublishTime"),
+ MESSAGE_KEY("key"),
+ MESSAGE_ID("messageId"),
+ MESSAGE_ID_BYTES_B64("messageIdBytes"),
+ PRODUCER_NAME("producerName"),
+ SCHEMA_VERSION("schemaVersion"),
+ SEQUENCE_ID("sequenceId"),
+ ORDERING_KEY("orderingKey"),
+ SIZE("size"),
+ TOPIC_NAME("topicName"),
+ INDEX("index"),
+ REDELIVERY_COUNT("redeliveryCount");
+
+ private final String _key;
+
+ PulsarMessageMetadataValue(String key) {
+ _key = key;
+ }
+
+ public String getKey() {
+ return _key;
+ }
+
+ public static PulsarMessageMetadataValue findByKey(final String key) {
+ EnumSet<PulsarMessageMetadataValue> values = EnumSet.allOf(PulsarMessageMetadataValue.class);
+ return values.stream().filter(value -> value.getKey().equals(key)).findFirst().orElse(null);
+ }
+ }
+
+ public PulsarStreamMessageMetadata(long recordIngestionTimeMs,
+ @Nullable GenericRow headers) {
+ super(recordIngestionTimeMs, headers);
+ }
+
+ public PulsarStreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers,
+ Map<String, String> metadata) {
+ super(recordIngestionTimeMs, headers, metadata);
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
index 763b0fc0d4..d22f8b0b5f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
@@ -18,13 +18,22 @@
*/
package org.apache.pinot.plugin.stream.pulsar;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PulsarUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PulsarUtils.class);
+
+ private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4);
+
private PulsarUtils() {
}
@@ -51,4 +60,35 @@ public class PulsarUtils {
throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
}
+
+ /**
+ * Stitch key and value bytes together using a simple format:
+ * 4 bytes for key length + key bytes + 4 bytes for value length + value bytes
+ */
+ protected static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
+ int keyLen = keyBytes.length;
+ int valueLen = valueBytes.length;
+ int totalByteArrayLength = 8 + keyLen + valueLen;
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream(totalByteArrayLength)) {
+ LENGTH_BUF.clear();
+ bos.write(LENGTH_BUF.putInt(keyLen).array());
+ bos.write(keyBytes);
+ LENGTH_BUF.clear();
+ bos.write(LENGTH_BUF.putInt(valueLen).array());
+ bos.write(valueBytes);
+ return bos.toByteArray();
+ } catch (Exception e) {
+ LOGGER.error("Unable to stitch key and value bytes together", e);
+ }
+ return null;
+ }
+
+ protected static PulsarStreamMessage buildPulsarStreamMessage(Message<byte[]> message, boolean enableKeyValueStitch,
+ PulsarMetadataExtractor pulsarMetadataExtractor) {
+ byte[] key = message.getKeyBytes();
+ byte[] data = enableKeyValueStitch ? stitchKeyValue(key, message.getData()) : message.getData();
+ int dataLength = (data != null) ? data.length : 0;
+ return new PulsarStreamMessage(key, data, message.getMessageId(),
+ (PulsarStreamMessageMetadata) pulsarMetadataExtractor.extract(message), dataLength);
+ }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
new file mode 100644
index 0000000000..ad23c83ce0
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class PulsarConfigTest {
+ public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME";
+
+ public static final String STREAM_TYPE = "pulsar";
+ public static final String STREAM_PULSAR_BROKER_LIST = "pulsar://localhost:6650";
+ public static final String STREAM_PULSAR_CONSUMER_TYPE = "simple";
+ Map<String, String> getCommonStreamConfigMap() {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", STREAM_TYPE);
+ streamConfigMap.put("stream.pulsar.consumer.type", STREAM_PULSAR_CONSUMER_TYPE);
+ streamConfigMap.put("stream.pulsar.topic.name", "test-topic");
+ streamConfigMap.put("stream.pulsar.bootstrap.servers", STREAM_PULSAR_BROKER_LIST);
+ streamConfigMap.put("stream.pulsar.consumer.prop.auto.offset.reset", "smallest");
+ streamConfigMap.put("stream.pulsar.consumer.factory.class.name", PulsarConsumerFactory.class.getName());
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
+ "1000");
+ streamConfigMap.put("stream.pulsar.decoder.class.name", "decoderClass");
+ return streamConfigMap;
+ }
+
+ @Test
+ public void testParsingMetadataConfigWithConfiguredFields() throws Exception {
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+ "true");
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.METADATA_FIELDS),
+ "messageId,messageIdBytes, publishTime, eventTime, key, topicName, ");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract =
+ pulsarConfig.getMetadataFields();
+ Assert.assertEquals(metadataFieldsToExtract.size(), 6);
+ Assert.assertTrue(metadataFieldsToExtract.containsAll(List.of(
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.TOPIC_NAME)));
+ }
+
+ @Test
+ public void testParsingMetadataConfigWithoutConfiguredFields() throws Exception {
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+ "true");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract =
+ pulsarConfig.getMetadataFields();
+ Assert.assertEquals(metadataFieldsToExtract.size(), 0);
+ }
+
+ @Test
+ public void testParsingNoMetadataConfig() throws Exception {
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+ "false");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+ Assert.assertFalse(pulsarConfig.isPopulateMetadata());
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract =
+ pulsarConfig.getMetadataFields();
+ Assert.assertEquals(metadataFieldsToExtract.size(), 0);
+ }
+
+ @Test
+ public void testParsingNoMetadataConfigWithConfiguredFields() throws Exception {
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.METADATA_POPULATE),
+ "false");
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, PulsarConfig.METADATA_FIELDS),
+ "messageId,messageIdBytes, publishTime, eventTime, key, topicName, ");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE, streamConfigMap);
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFieldsToExtract =
+ pulsarConfig.getMetadataFields();
+ Assert.assertFalse(pulsarConfig.isPopulateMetadata());
+ Assert.assertEquals(metadataFieldsToExtract.size(), 0);
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
index 4779e6afcd..9d59f82fcc 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
@@ -281,34 +281,35 @@ public class PulsarConsumerTest {
int totalMessagesReceived = 0;
- final PartitionGroupConsumer consumer =
- streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus);
- final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
+ final PulsarPartitionLevelConsumer consumer =
+ (PulsarPartitionLevelConsumer) streamConsumerFactory
+ .createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus);
+ final PulsarMessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch1.getMessageCount(), 500);
for (int i = 0; i < messageBatch1.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i);
+ final byte[] msg = messageBatch1.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + i);
totalMessagesReceived++;
}
- final MessageBatch messageBatch2 =
+ final PulsarMessageBatch messageBatch2 =
consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), null,
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch2.getMessageCount(), 500);
for (int i = 0; i < messageBatch2.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i);
+ final byte[] msg = messageBatch2.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
totalMessagesReceived++;
}
- final MessageBatch messageBatch3 =
+ final PulsarMessageBatch messageBatch3 =
consumer.fetchMessages(new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 10)),
new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 35)),
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch3.getMessageCount(), 25);
for (int i = 0; i < messageBatch3.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i);
+ final byte[] msg = messageBatch3.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
}
@@ -333,36 +334,37 @@ public class PulsarConsumerTest {
int totalMessagesReceived = 0;
- final PartitionGroupConsumer consumer =
- streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID, partitionGroupConsumptionStatus);
+ final PulsarPartitionLevelConsumer consumer =
+ (PulsarPartitionLevelConsumer) streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID,
+ partitionGroupConsumptionStatus);
//TODO: This test failed, check it out.
- final MessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
+ final PulsarMessageBatch messageBatch1 = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest),
new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)),
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch1.getMessageCount(), 500);
for (int i = 0; i < messageBatch1.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i);
+ final byte[] msg = messageBatch1.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + i);
totalMessagesReceived++;
}
- final MessageBatch messageBatch2 =
+ final PulsarMessageBatch messageBatch2 =
consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)), null,
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch2.getMessageCount(), 500);
for (int i = 0; i < messageBatch2.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i);
+ final byte[] msg = messageBatch2.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
totalMessagesReceived++;
}
- final MessageBatch messageBatch3 =
+ final PulsarMessageBatch messageBatch3 =
consumer.fetchMessages(new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 10)),
new MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 35)),
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch3.getMessageCount(), 25);
for (int i = 0; i < messageBatch3.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i);
+ final byte[] msg = messageBatch3.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
index 7cc0a99a6c..904dd33a04 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
@@ -20,10 +20,13 @@ package org.apache.pinot.plugin.stream.pulsar;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
@@ -37,30 +40,33 @@ public class PulsarMessageBatchTest {
private DummyPulsarMessage _msgWithKeyAndValue;
private byte[] _expectedValueBytes;
private byte[] _expectedKeyBytes;
- private List<Message<byte[]>> _messageList;
+ private List<DummyPulsarMessage> _messageList;
+ private PulsarMetadataExtractor _metadataExtractor;
- class DummyPulsarMessage implements Message<byte[]> {
+ public static class DummyPulsarMessage implements Message<byte[]> {
private final byte[] _keyData;
private final byte[] _valueData;
+ private Map<String, String> _properties;
public DummyPulsarMessage(byte[] key, byte[] value) {
_keyData = key;
_valueData = value;
+ _properties = new HashMap<>();
}
@Override
public Map<String, String> getProperties() {
- return null;
+ return _properties;
}
@Override
public boolean hasProperty(String name) {
- return false;
+ return _properties.containsKey(name);
}
@Override
public String getProperty(String name) {
- return null;
+ return _properties.get(name);
}
@Override
@@ -80,7 +86,7 @@ public class PulsarMessageBatchTest {
@Override
public MessageId getMessageId() {
- return null;
+ return MessageId.earliest;
}
@Override
@@ -110,7 +116,7 @@ public class PulsarMessageBatchTest {
@Override
public String getKey() {
- return _keyData.toString();
+ return new String(_keyData);
}
@Override
@@ -196,20 +202,28 @@ public class PulsarMessageBatchTest {
_random.nextBytes(_expectedKeyBytes);
_msgWithKeyAndValue = new DummyPulsarMessage(_expectedKeyBytes, _expectedValueBytes);
_messageList = new ArrayList<>();
+ _metadataExtractor = PulsarMetadataExtractor.build(true,
+ EnumSet.allOf(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.class));
_messageList.add(_msgWithKeyAndValue);
}
@Test
public void testMessageBatchNoStitching() {
- PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList, false);
- byte[] valueBytes = messageBatch.getMessageAtIndex(0);
+ List<PulsarStreamMessage> streamMessages = _messageList.stream().map(message ->
+ PulsarUtils.buildPulsarStreamMessage(message, false, _metadataExtractor))
+ .collect(Collectors.toList());
+ PulsarMessageBatch messageBatch = new PulsarMessageBatch(streamMessages, false);
+ byte[] valueBytes = messageBatch.getMessageAtIndex(0).getValue();
Assert.assertArrayEquals(_expectedValueBytes, valueBytes);
}
@Test
public void testMessageBatchWithStitching() {
- PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList, true);
- byte[] keyValueBytes = messageBatch.getMessageAtIndex(0);
+ List<PulsarStreamMessage> streamMessages = _messageList.stream().map(message ->
+ PulsarUtils.buildPulsarStreamMessage(message, true, _metadataExtractor))
+ .collect(Collectors.toList());
+ PulsarMessageBatch messageBatch = new PulsarMessageBatch(streamMessages, true);
+ byte[] keyValueBytes = messageBatch.getMessageAtIndex(0).getValue();
Assert.assertEquals(keyValueBytes.length, 8 + _expectedKeyBytes.length + _expectedValueBytes.length);
try {
ByteBuffer byteBuffer = ByteBuffer.wrap(keyValueBytes);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java
new file mode 100644
index 0000000000..4c8e28021a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Set;
+import org.apache.pulsar.client.api.MessageId;
+import org.bouncycastle.util.encoders.Base64;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.plugin.stream.pulsar.PulsarMessageBatchTest.DummyPulsarMessage;
+import static org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID;
+import static org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64;
+import static org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY;
+import static org.testng.Assert.assertEquals;
+
+
+public class PulsarMetadataExtractorTest {
+
+ private PulsarMetadataExtractor _metadataExtractor;
+
+ @BeforeClass
+ public void setup() {
+ _metadataExtractor = PulsarMetadataExtractor.build(true, Set.of(MESSAGE_ID, MESSAGE_ID_BYTES_B64, MESSAGE_KEY));
+ }
+
+ @Test
+ public void testExtractProperty()
+ throws Exception {
+ DummyPulsarMessage pulsarMessage =
+ new DummyPulsarMessage("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8));
+ pulsarMessage.getProperties().put("test_key", "test_value");
+ pulsarMessage.getProperties().put("test_key2", "2");
+ PulsarStreamMessageMetadata metadata = (PulsarStreamMessageMetadata) _metadataExtractor.extract(pulsarMessage);
+ assertEquals("test_value", metadata.getHeaders().getValue("test_key"));
+ assertEquals("2", metadata.getHeaders().getValue("test_key2"));
+ assertEquals("key", metadata.getRecordMetadata().get(MESSAGE_KEY.getKey()));
+ String messageIdStr = metadata.getRecordMetadata().get(MESSAGE_ID.getKey());
+ assertEquals(pulsarMessage.getMessageId().toString(), messageIdStr);
+
+ byte[] messageIdBytes = Base64.decode(metadata.getRecordMetadata().get(MESSAGE_ID_BYTES_B64.getKey()));
+ MessageId messageId = MessageId.fromByteArray(messageIdBytes);
+ assertEquals(MessageId.earliest, messageId);
+ }
+
+ @Test
+ public void testPulsarSteamMessageUnstitched() {
+ String key = "key";
+ String value = "value";
+ DummyPulsarMessage dummyPulsarMessage =
+ new DummyPulsarMessage(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
+ PulsarStreamMessage streamMessage =
+ PulsarUtils.buildPulsarStreamMessage(dummyPulsarMessage, false, _metadataExtractor);
+ assertEquals(key.getBytes(StandardCharsets.UTF_8), streamMessage.getKey());
+ assertEquals(value.getBytes(StandardCharsets.UTF_8), streamMessage.getValue());
+ assertEquals(key.getBytes(StandardCharsets.UTF_8).length, streamMessage.getKeyLength());
+ assertEquals(value.getBytes(StandardCharsets.UTF_8).length, streamMessage.getValueLength());
+ }
+
+ @Test
+ public void testPulsarSteamMessageStitched() {
+ String key = "key";
+ String value = "value";
+ byte[] stitchedValueBytes =
+ PulsarUtils.stitchKeyValue(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
+ DummyPulsarMessage dummyPulsarMessage =
+ new DummyPulsarMessage(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
+ PulsarStreamMessage streamMessage =
+ PulsarUtils.buildPulsarStreamMessage(dummyPulsarMessage, true, _metadataExtractor);
+ assertEquals(key.getBytes(StandardCharsets.UTF_8), streamMessage.getKey());
+ assertEquals(stitchedValueBytes, streamMessage.getValue());
+ assertEquals(key.getBytes(StandardCharsets.UTF_8).length, streamMessage.getKeyLength());
+ assertEquals(stitchedValueBytes.length, streamMessage.getValueLength());
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
index 97958b92d3..b570067a69 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -50,10 +50,11 @@ public class StreamDataDecoderImpl implements StreamDataDecoder {
row.putValue(KEY, new String(message.getKey(), StandardCharsets.UTF_8));
}
RowMetadata metadata = message.getMetadata();
- if (metadata != null && metadata.getHeaders() != null) {
- metadata.getHeaders().getFieldToValueMap()
- .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value));
-
+ if (metadata != null) {
+ if (metadata.getHeaders() != null) {
+ metadata.getHeaders().getFieldToValueMap()
+ .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value));
+ }
metadata.getRecordMetadata()
.forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org