You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/21 01:29:43 UTC
[pulsar] branch master updated: [improve][broker][client] PIP-215 Added TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView. (#18195)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 05e6f5e3c71 [improve][broker][client] PIP-215 Added TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView. (#18195)
05e6f5e3c71 is described below
commit 05e6f5e3c71426ee30a8ad1852456f7be897bdf3
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Dec 20 17:29:34 2022 -0800
[improve][broker][client] PIP-215 Added TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView. (#18195)
---
pulsar-broker/pom.xml | 1 -
.../pulsar/client/impl/CompactionReaderImpl.java | 95 +++++
.../client/impl/RawBatchMessageContainerImpl.java | 171 ++++++++
.../org/apache/pulsar/compaction/Compactor.java | 4 +-
.../compaction/StrategicTwoPhaseCompactor.java | 434 +++++++++++++++++++++
.../pulsar/compaction/TwoPhaseCompactor.java | 8 +-
.../client/impl/CompactionReaderImplTest.java | 111 ++++++
.../client/impl/ProducerMemoryLimitTest.java | 8 +
.../pulsar/client/impl/ProducerSemaphoreTest.java | 8 +
.../impl/RawBatchMessageContainerImplTest.java | 283 ++++++++++++++
.../apache/pulsar/client/impl/TableViewTest.java | 48 +++
.../pulsar/compaction/CompactionRetentionTest.java | 24 +-
.../apache/pulsar/compaction/CompactionTest.java | 263 +++++++------
.../apache/pulsar/compaction/CompactorTest.java | 35 +-
.../compaction/NumericOrderCompactionStrategy.java | 38 ++
.../StrategicCompactionRetentionTest.java | 45 +++
.../pulsar/compaction/StrategicCompactionTest.java | 152 ++++++++
.../pulsar/compaction/StrategicCompactorTest.java | 49 +++
.../compaction/TopicCompactionStrategyTest.java | 54 +++
.../org/apache/pulsar/client/api/TableView.java | 10 +-
.../client/impl/BatchMessageContainerImpl.java | 33 +-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 4 +-
.../client/impl/TableViewConfigurationData.java | 2 +
.../apache/pulsar/client/impl/TableViewImpl.java | 70 +++-
.../client/impl/conf/ReaderConfigurationData.java | 6 +
.../common/topics/TopicCompactionStrategy.java | 76 ++++
26 files changed, 1857 insertions(+), 175 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 136bd670d5e..672c11da03e 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -182,7 +182,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-messagecrypto-bc</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java
new file mode 100644
index 00000000000..5961cf0fae7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ * An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ * The compaction consumer subscription is durable and consumes compacted messages from the earliest position.
+ * It does not acknowledge the message after each read. (needs to call acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+ ConsumerBase<T> consumer;
+
+ ReaderConfigurationData<T> readerConfiguration;
+ private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration,
+ ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture,
+ Schema<T> schema) {
+ super(client, readerConfiguration, executorProvider, consumerFuture, schema);
+ this.readerConfiguration = readerConfiguration;
+ this.consumer = getConsumer();
+ }
+
+ public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic,
+ CompletableFuture<Consumer<T>> consumerFuture,
+ CryptoKeyReader cryptoKeyReader) {
+ ReaderConfigurationData<T> conf = new ReaderConfigurationData<>();
+ conf.setTopicName(topic);
+ conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+ conf.setStartMessageId(MessageId.earliest);
+ conf.setStartMessageFromRollbackDurationInSec(0);
+ conf.setReadCompacted(true);
+ conf.setSubscriptionMode(SubscriptionMode.Durable);
+ conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+ conf.setCryptoKeyReader(cryptoKeyReader);
+ return new CompactionReaderImpl<>(client, conf, client.externalExecutorProvider(), consumerFuture, schema);
+ }
+
+
+ @Override
+ public Message<T> readNext() throws PulsarClientException {
+ return consumer.receive();
+ }
+
+ @Override
+ public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException {
+ return consumer.receive(timeout, unit);
+ }
+
+ @Override
+ public CompletableFuture<Message<T>> readNextAsync() {
+ return consumer.receiveAsync();
+ }
+
+ public CompletableFuture<MessageId> getLastMessageIdAsync() {
+ return consumer.getLastMessageIdAsync();
+ }
+
+ public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
+ return consumer.doAcknowledge(messageId, CommandAck.AckType.Cumulative, properties, null);
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
new file mode 100644
index 00000000000..cf6b213155c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodecNone;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
+
+/**
+ * A raw batch message container without producer. (Used for StrategicTwoPhaseCompactor)
+ *
+ * incoming single messages:
+ * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+ *
+ * batched into single batch message:
+ * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
+ */
+public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl {
+ MessageCrypto msgCrypto;
+ Set<String> encryptionKeys;
+ CryptoKeyReader cryptoKeyReader;
+ public RawBatchMessageContainerImpl(int maxNumMessagesInBatch) {
+ super();
+ compressionType = CompressionType.NONE;
+ compressor = new CompressionCodecNone();
+ if (maxNumMessagesInBatch > 0) {
+ this.maxNumMessagesInBatch = maxNumMessagesInBatch;
+ }
+ }
+ private ByteBuf encrypt(ByteBuf compressedPayload) {
+ if (msgCrypto == null) {
+ return compressedPayload;
+ }
+ int maxSize = msgCrypto.getMaxOutputSize(compressedPayload.readableBytes());
+ ByteBuf encryptedPayload = allocator.buffer(maxSize);
+ ByteBuffer targetBuffer = encryptedPayload.nioBuffer(0, maxSize);
+
+ try {
+ msgCrypto.encrypt(encryptionKeys, cryptoKeyReader, () -> messageMetadata,
+ compressedPayload.nioBuffer(), targetBuffer);
+ } catch (PulsarClientException e) {
+ encryptedPayload.release();
+ compressedPayload.release();
+ discard(e);
+ throw new RuntimeException("Failed to encrypt payload", e);
+ }
+ encryptedPayload.writerIndex(targetBuffer.remaining());
+ compressedPayload.release();
+ return encryptedPayload;
+ }
+
+ @Override
+ public ProducerImpl.OpSendMsg createOpSendMsg() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Sets a CryptoKeyReader instance to encrypt batched messages during serialization, `toByteBuf()`.
+ * @param cryptoKeyReader a CryptoKeyReader instance
+ */
+ public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+ this.cryptoKeyReader = cryptoKeyReader;
+ }
+
+ /**
+ * Serializes the batched messages and return the ByteBuf.
+ * It sets the CompressionType and Encryption Keys from the batched messages.
+ * If successful, it calls `clear()` at the end to release buffers from this container.
+ *
+ * The returned byte buffer follows this format:
+ * [IdSize][Id][metadataAndPayloadSize][metadataAndPayload].
+ * This format is the same as RawMessage.serialize()'s format
+ * as the compacted messages is deserialized as RawMessage in broker.
+ *
+ * It throws the following runtime exceptions from encryption:
+ * IllegalStateException if cryptoKeyReader is not set for encrypted messages.
+ * IllegalArgumentException if encryption key init fails.
+ * RuntimeException if message encryption fails.
+ *
+ * @return a ByteBuf instance
+ */
+ public ByteBuf toByteBuf() {
+ if (numMessagesInBatch > 1) {
+ messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+ messageMetadata.setSequenceId(lowestSequenceId);
+ messageMetadata.setHighestSequenceId(highestSequenceId);
+ }
+ MessageImpl lastMessage = messages.get(messages.size() - 1);
+ MessageIdImpl lastMessageId = (MessageIdImpl) lastMessage.getMessageId();
+ MessageMetadata lastMessageMetadata = lastMessage.getMessageBuilder();
+
+ this.compressionType = lastMessageMetadata.getCompression();
+ this.compressor = CompressionCodecProvider.getCompressionCodec(lastMessageMetadata.getCompression());
+
+ if (!lastMessage.getEncryptionCtx().isEmpty()) {
+ EncryptionContext encryptionContext = (EncryptionContext) lastMessage.getEncryptionCtx().get();
+
+ if (cryptoKeyReader == null) {
+ IllegalStateException ex =
+ new IllegalStateException("Messages are encrypted but no cryptoKeyReader is provided.");
+ discard(ex);
+ throw ex;
+ }
+
+ encryptionKeys = encryptionContext.getKeys().keySet();
+ if (msgCrypto == null) {
+ msgCrypto =
+ new MessageCryptoBc(String.format(
+ "[%s] [%s]", topicName, "RawBatchMessageContainer"), true);
+ try {
+ msgCrypto.addPublicKeyCipher(encryptionKeys, cryptoKeyReader);
+ } catch (PulsarClientException.CryptoException e) {
+ discard(e);
+ throw new IllegalArgumentException("Failed to set encryption keys", e);
+ }
+ }
+ }
+
+ ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload());
+ updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
+ ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
+ messageMetadata, encryptedPayload);
+
+ MessageIdData idData = new MessageIdData();
+ idData.setLedgerId(lastMessageId.getLedgerId());
+ idData.setEntryId(lastMessageId.getEntryId());
+ idData.setPartition(lastMessageId.getPartitionIndex());
+
+ // Format: [IdSize][Id][metadataAndPayloadSize][metadataAndPayload]
+ // Following RawMessage.serialize() format as the compacted messages will be parsed as RawMessage in broker
+ int idSize = idData.getSerializedSize();
+ int headerSize = 4 /* IdSize */ + idSize + 4 /* metadataAndPayloadSize */;
+ int totalSize = headerSize + metadataAndPayload.readableBytes();
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(totalSize);
+ buf.writeInt(idSize);
+ idData.writeTo(buf);
+ buf.writeInt(metadataAndPayload.readableBytes());
+ buf.writeBytes(metadataAndPayload);
+ encryptedPayload.release();
+ clear();
+ return buf;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
index 522618f9d6a..e93a642c76e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
@@ -40,8 +40,8 @@ public abstract class Compactor {
protected final ServiceConfiguration conf;
protected final ScheduledExecutorService scheduler;
- private final PulsarClient pulsar;
- private final BookKeeper bk;
+ protected final PulsarClient pulsar;
+ protected final BookKeeper bk;
protected final CompactorMXBeanImpl mxBean;
public Compactor(ServiceConfiguration conf,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
new file mode 100644
index 00000000000..bb0850efab4
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * <p>As the first pass caches the entire message(not just offset) for each key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+ private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+ private static final int MAX_OUTSTANDING = 500;
+ private final Duration phaseOneLoopReadTimeout;
+ private final RawBatchMessageContainerImpl batchMessageContainer;
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler,
+ int maxNumMessagesInBatch) {
+ super(conf, pulsar, bk, scheduler);
+ batchMessageContainer = new RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+ phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ }
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+ this(conf, pulsar, bk, scheduler, -1);
+ }
+
+ public CompletableFuture<Long> compact(String topic) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ public <T> CompletableFuture<Long> compact(String topic,
+ TopicCompactionStrategy<T> strategy) {
+ return compact(topic, strategy, null);
+ }
+
+ public <T> CompletableFuture<Long> compact(String topic,
+ TopicCompactionStrategy<T> strategy,
+ CryptoKeyReader cryptoKeyReader) {
+ CompletableFuture<Consumer<T>> consumerFuture = new CompletableFuture<>();
+ if (cryptoKeyReader != null) {
+ batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+ }
+ CompactionReaderImpl reader = CompactionReaderImpl.create(
+ (PulsarClientImpl) pulsar, strategy.getSchema(), topic, consumerFuture, cryptoKeyReader);
+
+ return consumerFuture.thenComposeAsync(__ -> compactAndCloseReader(reader, strategy), scheduler);
+ }
+
+ <T> CompletableFuture<Long> doCompaction(Reader<T> reader, TopicCompactionStrategy strategy) {
+
+ if (!(reader instanceof CompactionReaderImpl<T>)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("reader has to be DelayedAckReaderImpl"));
+ }
+ return reader.hasMessageAvailableAsync()
+ .thenCompose(available -> {
+ if (available) {
+ return phaseOne(reader, strategy)
+ .thenCompose((result) -> phaseTwo(result, reader, bk));
+ } else {
+ log.info("Skip compaction of the empty topic {}", reader.getTopic());
+ return CompletableFuture.completedFuture(-1L);
+ }
+ });
+ }
+
+ <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, TopicCompactionStrategy strategy) {
+ CompletableFuture<Long> promise = new CompletableFuture<>();
+ mxBean.addCompactionStartOp(reader.getTopic());
+ doCompaction(reader, strategy).whenComplete(
+ (ledgerId, exception) -> {
+ log.info("Completed doCompaction ledgerId:{}", ledgerId);
+ reader.closeAsync().whenComplete((v, exception2) -> {
+ if (exception2 != null) {
+ log.warn("Error closing reader handle {}, ignoring", reader, exception2);
+ }
+ if (exception != null) {
+ // complete with original exception
+ mxBean.addCompactionEndOp(reader.getTopic(), false);
+ promise.completeExceptionally(exception);
+ } else {
+ mxBean.addCompactionEndOp(reader.getTopic(), true);
+ promise.complete(ledgerId);
+ }
+ });
+ });
+ return promise;
+ }
+
+ private <T> boolean doCompactMessage(
+ Message<T> msg, PhaseOneResult<T> result, TopicCompactionStrategy<T> strategy) {
+ Map<String, Message<T>> cache = result.cache;
+ String key = msg.getKey();
+
+ if (key == null) {
+ msg.release();
+ return true;
+ }
+ T val = msg.getValue();
+ Message<T> prev = cache.get(key);
+ T prevVal = prev == null ? null : prev.getValue();
+
+ if (!strategy.shouldKeepLeft(prevVal, val)) {
+ if (val != null && msg.size() > 0) {
+ cache.remove(key); // to reorder
+ cache.put(key, msg);
+ } else {
+ cache.remove(key);
+ msg.release();
+ }
+
+ if (prev != null) {
+ prev.release();
+ }
+
+ result.validCompactionCount.incrementAndGet();
+ return true;
+ } else {
+ msg.release();
+ result.invalidCompactionCount.incrementAndGet();
+ return false;
+ }
+
+ }
+
+ private static class PhaseOneResult<T> {
+ MessageId firstId;
+ MessageId lastId; // last read messageId
+ Map<String, Message<T>> cache;
+
+ AtomicInteger invalidCompactionCount;
+
+ AtomicInteger validCompactionCount;
+
+ AtomicInteger numReadMessages;
+
+ String topic;
+
+ PhaseOneResult(String topic) {
+ this.topic = topic;
+ cache = new LinkedHashMap<>();
+ invalidCompactionCount = new AtomicInteger();
+ validCompactionCount = new AtomicInteger();
+ numReadMessages = new AtomicInteger();
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "{Topic:%s, firstId:%s, lastId:%s, cache.size:%d, "
+ + "invalidCompactionCount:%d, validCompactionCount:%d, numReadMessages:%d}",
+ topic,
+ firstId != null ? firstId.toString() : "",
+ lastId != null ? lastId.toString() : "",
+ cache.size(),
+ invalidCompactionCount.get(),
+ validCompactionCount.get(),
+ numReadMessages.get());
+ }
+ }
+
+
+ private <T> CompletableFuture<PhaseOneResult> phaseOne(Reader<T> reader, TopicCompactionStrategy strategy) {
+ CompletableFuture<PhaseOneResult> promise = new CompletableFuture<>();
+ PhaseOneResult<T> result = new PhaseOneResult(reader.getTopic());
+
+ ((CompactionReaderImpl<T>) reader).getLastMessageIdAsync()
+ .thenAccept(lastMessageId -> {
+ log.info("Commencing phase one of compaction for {}, reading to {}",
+ reader.getTopic(), lastMessageId);
+ result.lastId = copyMessageId(lastMessageId);
+ phaseOneLoop(reader, promise, result, strategy);
+ }).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+
+ return promise;
+
+ }
+
+ private static MessageId copyMessageId(MessageId msgId) {
+ if (msgId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl tempId = (BatchMessageIdImpl) msgId;
+ return new BatchMessageIdImpl(tempId);
+ } else if (msgId instanceof MessageIdImpl) {
+ MessageIdImpl tempId = (MessageIdImpl) msgId;
+ return new MessageIdImpl(tempId.getLedgerId(), tempId.getEntryId(),
+ tempId.getPartitionIndex());
+ } else {
+ throw new IllegalStateException("Unknown lastMessageId type");
+ }
+ }
+
+ private <T> void phaseOneLoop(Reader<T> reader, CompletableFuture<PhaseOneResult> promise,
+ PhaseOneResult<T> result, TopicCompactionStrategy<T> strategy) {
+
+ if (promise.isDone()) {
+ return;
+ }
+
+ CompletableFuture<Message<T>> future = reader.readNextAsync();
+ FutureUtil.addTimeoutHandling(future,
+ phaseOneLoopReadTimeout, scheduler,
+ () -> FutureUtil.createTimeoutException("Timeout", getClass(),
+ "phaseOneLoop(...)"));
+
+ future.thenAcceptAsync(msg -> {
+
+ MessageId id = msg.getMessageId();
+ boolean completed = false;
+ if (result.lastId.compareTo(id) == 0) {
+ completed = true;
+ }
+
+ result.numReadMessages.incrementAndGet();
+ mxBean.addCompactionReadOp(reader.getTopic(), msg.size());
+ if (doCompactMessage(msg, result, strategy)) {
+ mxBean.addCompactionRemovedEvent(reader.getTopic());
+ }
+ //set ids in the result
+ if (result.firstId == null) {
+ result.firstId = copyMessageId(id);
+ log.info("Resetting cursor to firstId:{}", result.firstId);
+ try {
+ reader.seek(result.firstId);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException("Failed to reset the cursor to firstId:" + result.firstId, e);
+ }
+ }
+ if (completed) {
+ promise.complete(result);
+ } else {
+ phaseOneLoop(reader, promise, result, strategy);
+ }
+
+ }, scheduler).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+
+ }
+
+ private <T> CompletableFuture<Long> phaseTwo(PhaseOneResult<T> phaseOneResult, Reader<T> reader, BookKeeper bk) {
+ log.info("Completed phase one. Result:{}. ", phaseOneResult);
+ Map<String, byte[]> metadata =
+ LedgerMetadataUtils.buildMetadataForCompactedLedger(
+ phaseOneResult.topic, phaseOneResult.lastId.toByteArray());
+ return createLedger(bk, metadata)
+ .thenCompose((ledger) -> {
+ log.info(
+ "Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
+ phaseOneResult.topic, phaseOneResult.firstId, phaseOneResult.lastId,
+ phaseOneResult.cache.size(), ledger.getId());
+ return runPhaseTwo(phaseOneResult, reader, ledger, bk);
+ });
+ }
+
+ private <T> CompletableFuture<Long> runPhaseTwo(
+ PhaseOneResult<T> phaseOneResult, Reader<T> reader, LedgerHandle ledger, BookKeeper bk) {
+ CompletableFuture<Long> promise = new CompletableFuture<>();
+ Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+ CompletableFuture<Void> loopPromise = new CompletableFuture<>();
+ phaseTwoLoop(phaseOneResult.topic, phaseOneResult.cache.values().iterator(), ledger,
+ outstanding, loopPromise);
+ loopPromise.thenCompose((v) -> {
+ log.info("Flushing batch container numMessagesInBatch:{}",
+ batchMessageContainer.getNumMessagesInBatch());
+ return addToCompactedLedger(ledger, null, reader.getTopic(), outstanding)
+ .whenComplete((res, exception2) -> {
+ if (exception2 != null) {
+ promise.completeExceptionally(exception2);
+ return;
+ }
+ });
+ })
+ .thenCompose(v -> {
+ log.info("Acking ledger id {}", phaseOneResult.firstId);
+ return ((CompactionReaderImpl<T>) reader)
+ .acknowledgeCumulativeAsync(
+ phaseOneResult.lastId, Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
+ ledger.getId()));
+ })
+ .thenCompose((v) -> closeLedger(ledger))
+ .whenComplete((v, exception) -> {
+ if (exception != null) {
+ deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
+ if (exception2 != null) {
+ log.error("Cleanup of ledger {} for failed", ledger, exception2);
+ }
+ // complete with original exception
+ promise.completeExceptionally(exception);
+ });
+ } else {
+ log.info("kept ledger:{}", ledger.getId());
+ promise.complete(ledger.getId());
+ }
+ });
+
+ return promise;
+ }
+
+ private <T> void phaseTwoLoop(String topic, Iterator<Message<T>> reader,
+ LedgerHandle lh, Semaphore outstanding,
+ CompletableFuture<Void> promise) {
+ if (promise.isDone()) {
+ return;
+ }
+ CompletableFuture.runAsync(() -> {
+ if (reader.hasNext()) {
+ Message<T> message = reader.next();
+ mxBean.addCompactionReadOp(topic, message.size());
+ addToCompactedLedger(lh, message, topic, outstanding)
+ .whenComplete((res, exception2) -> {
+ if (exception2 != null) {
+ promise.completeExceptionally(exception2);
+ return;
+ }
+ });
+ phaseTwoLoop(topic, reader, lh, outstanding, promise);
+ } else {
+ try {
+ outstanding.acquire(MAX_OUTSTANDING);
+ } catch (InterruptedException e) {
+ promise.completeExceptionally(e);
+ return;
+ }
+ promise.complete(null);
+ return;
+ }
+
+ }, scheduler)
+ .exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+ }
+
+ <T> CompletableFuture<Boolean> addToCompactedLedger(
+ LedgerHandle lh, Message<T> m, String topic, Semaphore outstanding) {
+ CompletableFuture<Boolean> bkf = new CompletableFuture<>();
+ if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
+ if (batchMessageContainer.getNumMessagesInBatch() > 0) {
+ try {
+ ByteBuf serialized = batchMessageContainer.toByteBuf();
+ outstanding.acquire();
+ mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
+ long start = System.nanoTime();
+ lh.asyncAddEntry(serialized,
+ (rc, ledger, eid, ctx) -> {
+ outstanding.release();
+ mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(true);
+ }
+ }, null);
+
+ } catch (Throwable t) {
+ log.error("Failed to add entry", t);
+ batchMessageContainer.discard((Exception) t);
+ return FutureUtil.failedFuture(t);
+ }
+ } else {
+ bkf.complete(false);
+ }
+ } else {
+ bkf.complete(false);
+ }
+ return bkf;
+ }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 84107190fd3..821dd9c0c9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
- private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
+ protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
private final Duration phaseOneLoopReadTimeout;
public TwoPhaseCompactor(ServiceConfiguration conf,
@@ -309,7 +309,7 @@ public class TwoPhaseCompactor extends Compactor {
});
}
- private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String, byte[]> metadata) {
+ protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String, byte[]> metadata) {
CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
try {
@@ -332,7 +332,7 @@ public class TwoPhaseCompactor extends Compactor {
return bkf;
}
- private CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle lh) {
+ protected CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle lh) {
CompletableFuture<Void> bkf = new CompletableFuture<>();
try {
bk.asyncDeleteLedger(lh.getId(),
@@ -349,7 +349,7 @@ public class TwoPhaseCompactor extends Compactor {
return bkf;
}
- private CompletableFuture<Void> closeLedger(LedgerHandle lh) {
+ protected CompletableFuture<Void> closeLedger(LedgerHandle lh) {
CompletableFuture<Void> bkf = new CompletableFuture<>();
try {
lh.asyncClose((rc, ledger, ctx) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java
new file mode 100644
index 00000000000..6a490c918e7
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class CompactionReaderImplTest extends MockedPulsarServiceBaseTest {
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+ admin.clusters().createCluster("test",
+ ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("my-property",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ String topic = "persistent://my-property/my-ns/my-compact-topic";
+ int numKeys = 5;
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ for (int i = 0; i < numKeys; i++) {
+ producer.newMessage().key("key:" + i).value("value" + i).send();
+ }
+
+ @Cleanup
+ CompactionReaderImpl<String> reader = CompactionReaderImpl
+ .create((PulsarClientImpl) pulsarClient, Schema.STRING, topic, new CompletableFuture(), null);
+
+ ConsumerBase consumerBase = spy(reader.getConsumer());
+ FieldUtils.writeDeclaredField(
+ reader, "consumer", consumerBase, true);
+
+ ReaderConfigurationData readerConfigurationData =
+ (ReaderConfigurationData) FieldUtils.readDeclaredField(
+ reader, "readerConfiguration", true);
+
+
+ ReaderConfigurationData expected = new ReaderConfigurationData<>();
+ expected.setTopicName(topic);
+ expected.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+ expected.setStartMessageId(MessageId.earliest);
+ expected.setStartMessageFromRollbackDurationInSec(0);
+ expected.setReadCompacted(true);
+ expected.setSubscriptionMode(SubscriptionMode.Durable);
+ expected.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+
+ MessageIdImpl lastMessageId = (MessageIdImpl) reader.getLastMessageIdAsync().get();
+ MessageIdImpl id = null;
+ MessageImpl m = null;
+
+ Assert.assertEquals(readerConfigurationData, expected);
+ for (int i = 0; i < numKeys; i++) {
+ m = (MessageImpl) reader.readNextAsync().get();
+ id = (MessageIdImpl) m.getMessageId();
+ }
+ Assert.assertEquals(id, lastMessageId);
+ verify(consumerBase, times(0))
+ .acknowledgeCumulativeAsync(Mockito.any(MessageId.class));
+
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index e8e4355346e..3ec784e248c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -152,6 +152,14 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
}).when(mockAllocator).buffer(anyInt());
final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
+ /* Without `batchMessageContainer.setProducer(producer);` it throws NPE since producer is null, and
+ eventually sendAsync() catches this NPE and releases the memory and semaphore.
+ } catch (Throwable t) {
+ completeCallbackAndReleaseSemaphore(uncompressedSize, callback,
+ new PulsarClientException(t, msg.getSequenceId()));
+ }
+ */
+ batchMessageContainer.setProducer(producer);
Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
batchMessageContainerField.setAccessible(true);
batchMessageContainerField.set(spyProducer, batchMessageContainer);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 1d4f8c9b674..2f8cb655401 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -305,6 +305,14 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
batchMessageContainerField.setAccessible(true);
batchMessageContainerField.set(spyProducer, batchMessageContainer);
+ /* Without `batchMessageContainer.setProducer(producer);` it throws NPE since producer is null, and
+ eventually sendAsync() catches this NPE and releases the memory and semaphore.
+ } catch (Throwable t) {
+ completeCallbackAndReleaseSemaphore(uncompressedSize, callback,
+ new PulsarClientException(t, msg.getSequenceId()));
+ }
+ */
+ batchMessageContainer.setProducer(producer);
spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
Assert.fail("can not reach here");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
new file mode 100644
index 00000000000..9fa834a166c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+
+import static org.apache.pulsar.common.api.proto.CompressionType.LZ4;
+import static org.apache.pulsar.common.api.proto.CompressionType.NONE;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.compaction.CompactionTest;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class RawBatchMessageContainerImplTest {
+ CompressionType compressionType;
+ MessageCrypto msgCrypto;
+ CryptoKeyReader cryptoKeyReader;
+ Map<String, EncryptionContext.EncryptionKey> encryptKeys;
+
+ public void setEncryptionAndCompression(boolean encrypt, boolean compress) {
+ if (compress) {
+ compressionType = LZ4;
+ } else {
+ compressionType = NONE;
+ }
+
+ if (encrypt) {
+ cryptoKeyReader = new CompactionTest.EncKeyReader();
+ msgCrypto = new MessageCryptoBc("test", false);
+ String key = "client-ecdsa.pem";
+ EncryptionKeyInfo publicKeyInfo = cryptoKeyReader.getPublicKey(key, null);
+ encryptKeys = Map.of(
+ key, new EncryptionContext.EncryptionKey(publicKeyInfo.getKey(), publicKeyInfo.getMetadata()));
+ } else {
+ msgCrypto = null;
+ cryptoKeyReader = null;
+ encryptKeys = null;
+ }
+ }
+
+ public MessageImpl createMessage(String topic, String value, int entryId) {
+ MessageMetadata metadata = new MessageMetadata()
+ .setPublishTime(System.currentTimeMillis())
+ .setProducerName("test")
+ .setSequenceId(entryId);
+
+ MessageIdImpl id = new MessageIdImpl(0, entryId, -1);
+
+ if (compressionType != null) {
+ metadata.setCompression(compressionType);
+ }
+ Optional<EncryptionContext> encryptionContext = null;
+ if(encryptKeys != null) {
+ EncryptionContext tmp = new EncryptionContext();
+ tmp.setKeys(encryptKeys);
+ encryptionContext = Optional.of(tmp);
+ } else {
+ encryptionContext = Optional.empty();
+ }
+ ByteBuf payload = Unpooled.copiedBuffer(value.getBytes());
+ return new MessageImpl(topic, id,metadata, payload, encryptionContext, null, Schema.STRING);
+ }
+
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ setEncryptionAndCompression(false, false);
+ }
+ @Test
+ public void testToByteBuf() throws IOException {
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2);
+ String topic = "my-topic";
+ container.add(createMessage(topic, "hi-1", 0), null);
+ container.add(createMessage(topic, "hi-2", 1), null);
+ ByteBuf buf = container.toByteBuf();
+
+
+ int idSize = buf.readInt();
+ ByteBuf idBuf = buf.readBytes(idSize);
+ MessageIdData idData = new MessageIdData();
+ idData.parseFrom(idBuf, idSize);
+ Assert.assertEquals(idData.getLedgerId(), 0);
+ Assert.assertEquals(idData.getEntryId(), 1);
+ Assert.assertEquals(idData.getPartition(), -1);
+
+
+ int metadataAndPayloadSize = buf.readInt();
+ ByteBuf metadataAndPayload = buf.readBytes(metadataAndPayloadSize);
+ MessageImpl singleMessageMetadataAndPayload = MessageImpl.deserialize(metadataAndPayload);
+ MessageMetadata metadata = singleMessageMetadataAndPayload.getMessageBuilder();
+ Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
+ Assert.assertEquals(metadata.getHighestSequenceId(), 1);
+ Assert.assertEquals(metadata.getCompression(), NONE);
+
+ SingleMessageMetadata messageMetadata = new SingleMessageMetadata();
+ ByteBuf payload1 = Commands.deSerializeSingleMessageInBatch(
+ singleMessageMetadataAndPayload.getPayload(), messageMetadata, 0, 2);
+ ByteBuf payload2 = Commands.deSerializeSingleMessageInBatch(
+ singleMessageMetadataAndPayload.getPayload(), messageMetadata, 1, 2);
+
+ Assert.assertEquals(payload1.toString(Charset.defaultCharset()), "hi-1");
+ Assert.assertEquals(payload2.toString(Charset.defaultCharset()), "hi-2");
+ payload1.release();
+ payload2.release();
+ singleMessageMetadataAndPayload.release();
+ metadataAndPayload.release();
+ buf.release();
+ }
+
+ @Test
+ public void testToByteBufWithCompressionAndEncryption() throws IOException {
+ setEncryptionAndCompression(true, true);
+
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2);
+ container.setCryptoKeyReader(cryptoKeyReader);
+ String topic = "my-topic";
+ container.add(createMessage(topic, "hi-1", 0), null);
+ container.add(createMessage(topic, "hi-2", 1), null);
+ ByteBuf buf = container.toByteBuf();
+
+ int idSize = buf.readInt();
+ ByteBuf idBuf = buf.readBytes(idSize);
+ MessageIdData idData = new MessageIdData();
+ idData.parseFrom(idBuf, idSize);
+ Assert.assertEquals(idData.getLedgerId(), 0);
+ Assert.assertEquals(idData.getEntryId(), 1);
+ Assert.assertEquals(idData.getPartition(), -1);
+
+ int metadataAndPayloadSize = buf.readInt();
+ ByteBuf metadataAndPayload = buf.readBytes(metadataAndPayloadSize);
+ MessageImpl singleMessageMetadataAndPayload = MessageImpl.deserialize(metadataAndPayload);
+
+ MessageMetadata metadata = singleMessageMetadataAndPayload.getMessageBuilder();
+ Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
+ Assert.assertEquals(metadata.getHighestSequenceId(), 1);
+ Assert.assertEquals(metadata.getCompression(), compressionType);
+
+ ByteBuf payload = singleMessageMetadataAndPayload.getPayload();
+ int maxDecryptedSize = msgCrypto.getMaxOutputSize(payload.readableBytes());
+ ByteBuffer decrypted = ByteBuffer.allocate(maxDecryptedSize);
+ msgCrypto.decrypt(() -> metadata, payload.nioBuffer(), decrypted, cryptoKeyReader);
+ CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf uncompressed = codec.decode(Unpooled.wrappedBuffer(decrypted),
+ metadata.getUncompressedSize());
+ SingleMessageMetadata messageMetadata = new SingleMessageMetadata();
+
+ ByteBuf payload1 = Commands.deSerializeSingleMessageInBatch(
+ uncompressed, messageMetadata, 0, 2);
+ ByteBuf payload2 = Commands.deSerializeSingleMessageInBatch(
+ uncompressed, messageMetadata, 1, 2);
+
+ Assert.assertEquals(payload1.toString(Charset.defaultCharset()), "hi-1");
+ Assert.assertEquals(payload2.toString(Charset.defaultCharset()), "hi-2");
+ payload1.release();
+ payload2.release();
+ singleMessageMetadataAndPayload.release();
+ metadataAndPayload.release();
+ uncompressed.release();
+ buf.release();
+ }
+
+ @Test
+ public void testToByteBufWithSingleMessage() throws IOException {
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2);
+ String topic = "my-topic";
+ container.add(createMessage(topic, "hi-1", 0), null);
+ ByteBuf buf = container.toByteBuf();
+
+
+ int idSize = buf.readInt();
+ ByteBuf idBuf = buf.readBytes(idSize);
+ MessageIdData idData = new MessageIdData();
+ idData.parseFrom(idBuf, idSize);
+ Assert.assertEquals(idData.getLedgerId(), 0);
+ Assert.assertEquals(idData.getEntryId(), 0);
+ Assert.assertEquals(idData.getPartition(), -1);
+
+
+ int metadataAndPayloadSize = buf.readInt();
+ ByteBuf metadataAndPayload = buf.readBytes(metadataAndPayloadSize);
+ MessageImpl singleMessageMetadataAndPayload = MessageImpl.deserialize(metadataAndPayload);
+ MessageMetadata metadata = singleMessageMetadataAndPayload.getMessageBuilder();
+ Assert.assertEquals(metadata.getNumMessagesInBatch(), 1);
+ Assert.assertEquals(metadata.getHighestSequenceId(), 0);
+ Assert.assertEquals(metadata.getCompression(), NONE);
+
+ Assert.assertEquals(singleMessageMetadataAndPayload.getPayload().toString(Charset.defaultCharset()), "hi-1");
+ singleMessageMetadataAndPayload.release();
+ metadataAndPayload.release();
+ buf.release();
+ }
+
+ @Test
+ public void testMaxNumMessagesInBatch() {
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+ String topic = "my-topic";
+
+ boolean isFull = container.add(createMessage(topic, "hi", 0), null);
+ Assert.assertTrue(isFull);
+ Assert.assertTrue(container.isBatchFull());
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void testCreateOpSendMsg() {
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+ container.createOpSendMsg();
+ }
+
+ @Test
+ public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
+ setEncryptionAndCompression(true, false);
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+ String topic = "my-topic";
+ container.add(createMessage(topic, "hi-1", 0), null);
+ Assert.assertEquals(container.getNumMessagesInBatch(), 1);
+ Throwable e = null;
+ try {
+ container.toByteBuf();
+ } catch (IllegalStateException ex){
+ e = ex;
+ }
+ Assert.assertEquals(e.getClass(), IllegalStateException.class);
+ Assert.assertEquals(container.getNumMessagesInBatch(), 0);
+ Assert.assertEquals(container.batchedMessageMetadataAndPayload, null);
+ }
+
+ @Test
+ public void testToByteBufWithEncryptionWithInvalidEncryptKeys() {
+ setEncryptionAndCompression(true, false);
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+ container.setCryptoKeyReader(cryptoKeyReader);
+ encryptKeys = new HashMap<>();
+ encryptKeys.put(null, null);
+ String topic = "my-topic";
+ container.add(createMessage(topic, "hi-1", 0), null);
+ Assert.assertEquals(container.getNumMessagesInBatch(), 1);
+ Throwable e = null;
+ try {
+ container.toByteBuf();
+ } catch (IllegalArgumentException ex){
+ e = ex;
+ }
+ Assert.assertEquals(e.getClass(), IllegalArgumentException.class);
+ Assert.assertEquals(container.getNumMessagesInBatch(), 0);
+ Assert.assertEquals(container.batchedMessageMetadataAndPayload, null);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 950095871f7..09d49195217 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
@@ -295,4 +296,51 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
}
+
+ @Test(timeOut = 30 * 1000)
+ public void testListen() throws Exception {
+ String topic = "persistent://public/default/tableview-listen-test";
+ admin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().key("key:" + i).value("value" + i).send();
+ }
+
+ @Cleanup
+ TableView<String> tv = pulsarClient.newTableViewBuilder(Schema.STRING)
+ .topic(topic)
+ .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+ .create();
+
+ class MockAction implements BiConsumer<String, String> {
+ int acceptedCount = 0;
+ @Override
+ public void accept(String s, String s2) {
+ acceptedCount++;
+ }
+ }
+ MockAction mockAction = new MockAction();
+ tv.listen((k, v) -> mockAction.accept(k, v));
+
+ Awaitility.await()
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(Duration.ofMillis(5000))
+ .until(() -> tv.size() == 5);
+
+ assertEquals(mockAction.acceptedCount, 0);
+
+ for (int i = 5; i < 10; i++) {
+ producer.newMessage().key("key:" + i).value("value" + i).send();
+ }
+
+ Awaitility.await()
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(Duration.ofMillis(5000))
+ .until(() -> tv.size() == 10);
+
+ assertEquals(mockAction.acceptedCount, 5);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index f613dda3bf3..055c595fbfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -54,8 +55,9 @@ import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
- private ScheduledExecutorService compactionScheduler;
- private BookKeeper bk;
+ protected ScheduledExecutorService compactionScheduler;
+ protected BookKeeper bk;
+ private TwoPhaseCompactor compactor;
@BeforeMethod
@Override
@@ -72,18 +74,23 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
+ compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
-
+ bk.close();
if (compactionScheduler != null) {
compactionScheduler.shutdownNow();
}
}
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return compactor.compact(topic).get();
+ }
+
/**
* Compaction should retain expired keys in the compacted view
*/
@@ -105,8 +112,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
.topic(topic)
.create();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).join();
+ compact(topic);
log.info(" ---- X 1: {}", mapper.writeValueAsString(
admin.topics().getInternalStats(topic, false)));
@@ -141,7 +147,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
.send();
}
- compactor.compact(topic).join();
+ compact(topic);
validateMessages(pulsarClient, true, topic, round, allKeys);
@@ -152,7 +158,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
.send();
}
- compactor.compact(topic).join();
+ compact(topic);
log.info(" ---- X 4: {}", mapper.writeValueAsString(
admin.topics().getInternalStats(topic, false)));
@@ -221,8 +227,6 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
.topic(topic)
.create();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-
log.info(" ---- X 1: {}", mapper.writeValueAsString(
admin.topics().getInternalStats(topic, false)));
@@ -240,7 +244,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
validateMessages(pulsarClient, true, topic, round, allKeys);
- compactor.compact(topic).join();
+ compact(topic);
log.info(" ---- X 3: {}", mapper.writeValueAsString(
admin.topics().getInternalStats(topic, false)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 681b4a39c8e..ee5282bf472 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -75,6 +75,7 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -92,8 +93,9 @@ import org.testng.annotations.Test;
@Test(groups = "broker-impl")
public class CompactionTest extends MockedPulsarServiceBaseTest {
- private ScheduledExecutorService compactionScheduler;
- private BookKeeper bk;
+ protected ScheduledExecutorService compactionScheduler;
+ protected BookKeeper bk;
+ private TwoPhaseCompactor compactor;
@BeforeMethod
@Override
@@ -108,18 +110,33 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
+ compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
-
+ bk.close();
if (compactionScheduler != null) {
compactionScheduler.shutdownNow();
}
}
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return compactor.compact(topic).get();
+ }
+
+
+ protected long compact(String topic, CryptoKeyReader cryptoKeyReader)
+ throws ExecutionException, InterruptedException {
+ return compactor.compact(topic).get();
+ }
+
+ protected TwoPhaseCompactor getCompactor() {
+ return compactor;
+ }
+
@Test
public void testCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
@@ -147,8 +164,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
all.add(Pair.of(key, data));
}
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
// Compacted topic ledger should have same number of entry equals to number of unique key.
@@ -213,9 +229,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
all.add(Pair.of(key, value));
}
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
-
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
@@ -278,8 +292,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(m.getData(), "content2".getBytes());
}
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
@@ -304,8 +317,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
producer.newMessage().key("key0").value("content1".getBytes()).send();
producer.newMessage().key("key0").value("content2".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
producer.newMessage().key("key0").value("content3".getBytes()).send();
@@ -334,8 +346,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
producer.newMessage().key("key0").value("content1".getBytes()).send();
producer.newMessage().key("key0").value("content2".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
@@ -378,8 +389,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
producer.newMessage().key("key0").value("content1".getBytes()).send();
producer.newMessage().key("key0").value("content2".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
@@ -417,7 +427,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
- new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+ compact(topic);
producer.newMessage().key("key0").value("content0".getBytes()).send();
@@ -453,8 +463,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// Check that messages after compaction have same ids
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
@@ -512,8 +521,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
((BatchMessageIdImpl)messages.get(2).getMessageId()).getEntryId());
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// Check that messages after compaction have same ids
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
@@ -521,12 +529,23 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Message<byte[]> message1 = consumer.receive();
Assert.assertEquals(message1.getKey(), "key1");
Assert.assertEquals(new String(message1.getData()), "my-message-1");
- Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId());
Message<byte[]> message2 = consumer.receive();
Assert.assertEquals(message2.getKey(), "key2");
Assert.assertEquals(new String(message2.getData()), "my-message-3");
- Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId());
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ MessageIdImpl id = (MessageIdImpl) messages.get(0).getMessageId();
+ MessageIdImpl id1 = new MessageIdImpl(
+ id.getLedgerId(), id.getEntryId(), id.getPartitionIndex());
+ Assert.assertEquals(message1.getMessageId(), id1);
+ id = (MessageIdImpl) messages.get(2).getMessageId();
+ MessageIdImpl id2 = new MessageIdImpl(
+ id.getLedgerId(), id.getEntryId(), id.getPartitionIndex());
+ Assert.assertEquals(message2.getMessageId(), id2);
+ } else {
+ Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId());
+ Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId());
+ }
}
}
@@ -556,8 +575,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -593,34 +611,46 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
- Message<byte[]> message1 = consumer.receive();
- Assert.assertFalse(message1.hasKey());
- Assert.assertEquals(new String(message1.getData()), "my-message-1");
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ Message<byte[]> message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key1");
+ Assert.assertEquals(new String(message3.getData()), "my-message-4");
- Message<byte[]> message2 = consumer.receive();
- Assert.assertFalse(message2.hasKey());
- Assert.assertEquals(new String(message2.getData()), "my-message-2");
+ Message<byte[]> message4 = consumer.receive();
+ Assert.assertEquals(message4.getKey(), "key2");
+ Assert.assertEquals(new String(message4.getData()), "my-message-6");
- Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key1");
- Assert.assertEquals(new String(message3.getData()), "my-message-4");
+ Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+ assertNull(m);
+ } else {
+ Message<byte[]> message1 = consumer.receive();
+ Assert.assertFalse(message1.hasKey());
+ Assert.assertEquals(new String(message1.getData()), "my-message-1");
- Message<byte[]> message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()), "my-message-6");
+ Message<byte[]> message2 = consumer.receive();
+ Assert.assertFalse(message2.hasKey());
+ Assert.assertEquals(new String(message2.getData()), "my-message-2");
- Message<byte[]> message5 = consumer.receive();
- Assert.assertFalse(message5.hasKey());
- Assert.assertEquals(new String(message5.getData()), "my-message-7");
+ Message<byte[]> message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key1");
+ Assert.assertEquals(new String(message3.getData()), "my-message-4");
- Message<byte[]> message6 = consumer.receive();
- Assert.assertFalse(message6.hasKey());
- Assert.assertEquals(new String(message6.getData()), "my-message-8");
+ Message<byte[]> message4 = consumer.receive();
+ Assert.assertEquals(message4.getKey(), "key2");
+ Assert.assertEquals(new String(message4.getData()), "my-message-6");
+
+ Message<byte[]> message5 = consumer.receive();
+ Assert.assertFalse(message5.hasKey());
+ Assert.assertEquals(new String(message5.getData()), "my-message-7");
+
+ Message<byte[]> message6 = consumer.receive();
+ Assert.assertFalse(message6.hasKey());
+ Assert.assertEquals(new String(message6.getData()), "my-message-8");
+ }
}
}
@@ -693,8 +723,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -773,8 +802,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -837,8 +865,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have been opened
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// should have opened all except last to read
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
@@ -866,7 +893,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
ledgersOpened.clear();
// compact the topic again
- compactor.compact(topic).get();
+ compact(topic);
// shouldn't have opened first ledger (already compacted), penultimate would have some uncompacted data.
// last ledger already open for writing
@@ -916,8 +943,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -960,8 +986,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -975,7 +1000,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
}
- class EncKeyReader implements CryptoKeyReader {
+ public static class EncKeyReader implements CryptoKeyReader {
EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
@@ -1037,8 +1062,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
// Check that messages after compaction have same ids
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
@@ -1083,11 +1107,8 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
- // with encryption, all messages are passed through compaction as it doesn't
- // have the keys to decrypt the batch payload
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
.readCompacted(true).subscribe()){
@@ -1095,13 +1116,21 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(message1.getKey(), "key1");
Assert.assertEquals(new String(message1.getData()), "my-message-1");
- Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()), "my-message-2");
-
- Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key2");
- Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ Message<byte[]> message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key2");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ } else {
+ // with encryption, all messages are passed through compaction as it doesn't
+ // have the keys to decrypt the batch payload
+ Message<byte[]> message2 = consumer.receive();
+ Assert.assertEquals(message2.getKey(), "key2");
+ Assert.assertEquals(new String(message2.getData()), "my-message-2");
+
+ Message<byte[]> message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key2");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ }
}
}
@@ -1132,8 +1161,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
// Check that messages after compaction have same ids
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
@@ -1179,8 +1207,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
// with encryption, all messages are passed through compaction as it doesn't
// have the keys to decrypt the batch payload
@@ -1191,13 +1218,20 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(message1.getKey(), "key1");
Assert.assertEquals(new String(message1.getData()), "my-message-1");
- Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()), "my-message-2");
- Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key2");
- Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ Message<byte[]> message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key2");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ } else {
+ Message<byte[]> message2 = consumer.receive();
+ Assert.assertEquals(message2.getKey(), "key2");
+ Assert.assertEquals(new String(message2.getData()), "my-message-2");
+
+ Message<byte[]> message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key2");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ }
}
}
@@ -1254,8 +1288,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.cryptoKeyReader(new EncKeyReader())
@@ -1264,22 +1297,32 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(message1.getKey(), "key0");
Assert.assertEquals(new String(message1.getData()), "my-message-0");
- // see all messages from batch
- Message<byte[]> message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()), "my-message-2");
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ Message<byte[]> message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key3");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
- Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key3");
- Assert.assertEquals(new String(message3.getData()), "my-message-3");
-
- Message<byte[]> message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()), "");
-
- Message<byte[]> message5 = consumer.receive();
- Assert.assertEquals(message5.getKey(), "key4");
- Assert.assertEquals(new String(message5.getData()), "my-message-4");
+ Message<byte[]> message5 = consumer.receive();
+ Assert.assertEquals(message5.getKey(), "key4");
+ Assert.assertEquals(new String(message5.getData()), "my-message-4");
+ } else {
+ // see all messages from batch
+ Message<byte[]> message2 = consumer.receive();
+ Assert.assertEquals(message2.getKey(), "key2");
+ Assert.assertEquals(new String(message2.getData()), "my-message-2");
+
+ Message<byte[]> message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key3");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+
+ Message<byte[]> message4 = consumer.receive();
+ Assert.assertEquals(message4.getKey(), "key2");
+ Assert.assertEquals(new String(message4.getData()), "");
+
+ Message<byte[]> message5 = consumer.receive();
+ Assert.assertEquals(message5.getKey(), "key4");
+ Assert.assertEquals(new String(message5.getData()), "my-message-4");
+ }
}
}
@@ -1303,8 +1346,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
producer.newMessage().key("1").value("".getBytes()).send();
producer.newMessage().key("2").value("".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
Set<String> expected = Sets.newHashSet("3");
// consumer with readCompacted enabled only get compacted entries
@@ -1329,8 +1371,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
producer.newMessage().key("1").value("".getBytes()).send();
producer.newMessage().key("2").value("".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1364,8 +1405,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1409,8 +1449,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1465,8 +1504,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1519,8 +1557,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1559,8 +1596,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// 3. Send more ten messages
futures.clear();
@@ -1608,8 +1644,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// 3. Send more ten messages
futures.clear();
@@ -1638,7 +1673,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
producer.newMessage().key(key).value(("").getBytes()).send();
// 5.compact the topic.
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 8099cd51668..e86be6a4db8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -58,7 +59,12 @@ import org.testng.annotations.Test;
@Test(groups = "broker-compaction")
public class CompactorTest extends MockedPulsarServiceBaseTest {
- private ScheduledExecutorService compactionScheduler;
+ protected ScheduledExecutorService compactionScheduler;
+
+ protected BookKeeper bk;
+ protected Compactor compactor;
+
+
@BeforeMethod
@Override
@@ -73,20 +79,31 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
+ bk = pulsar.getBookKeeperClientFactory().create(
+ this.conf, null, null, Optional.empty(), null);
+ compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
}
+
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
+ bk.close();
compactionScheduler.shutdownNow();
}
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return compactor.compact(topic).get();
+ }
+
+ protected Compactor getCompactor() {
+ return compactor;
+ }
+
private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception {
- BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null, null, Optional.empty(), null);
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- long compactedLedgerId = compactor.compact(topic).get();
+
+ long compactedLedgerId = compact(topic);
LedgerHandle ledger = bk.openLedger(compactedLedgerId,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
@@ -111,7 +128,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
m.close();
}
if (checkMetrics) {
- CompactionRecord compactionRecord = compactor.getStats().getCompactionRecordForTopic(topic).get();
+ CompactionRecord compactionRecord = getCompactor().getStats().getCompactionRecordForTopic(topic).get();
long compactedTopicRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
long lastCompactSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
long lastCompactFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
@@ -227,10 +244,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
// trigger creation of topic on server side
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
- BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null, null, Optional.empty(), null);
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
}
@Test
@@ -240,7 +254,6 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class),
Mockito.mock(BookKeeper.class), compactionScheduler);
Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
-
}
public ByteBuf extractPayload(RawMessage m) throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/NumericOrderCompactionStrategy.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/NumericOrderCompactionStrategy.java
new file mode 100644
index 00000000000..64e761c34cc
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/NumericOrderCompactionStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+
+public class NumericOrderCompactionStrategy implements TopicCompactionStrategy<Integer> {
+
+ @Override
+ public Schema<Integer> getSchema() {
+ return Schema.INT32;
+ }
+
+ @Override
+ public boolean shouldKeepLeft(Integer prev, Integer cur) {
+ if (prev == null || cur == null) {
+ return false;
+ }
+ return prev >= cur;
+ }
+ }
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
new file mode 100644
index 00000000000..1cac04c2fa9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class StrategicCompactionRetentionTest extends CompactionRetentionTest {
+ private TopicCompactionStrategy strategy;
+ private StrategicTwoPhaseCompactor compactor;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
+ strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
+ }
+
+ @Override
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return (long) compactor.compact(topic, strategy).get();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
new file mode 100644
index 00000000000..135a839bd54
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "flaky")
+public class StrategicCompactionTest extends CompactionTest {
+ private TopicCompactionStrategy strategy;
+ private StrategicTwoPhaseCompactor compactor;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
+ strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
+ }
+
+ @Override
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return (long) compactor.compact(topic, strategy).get();
+ }
+
+ @Override
+ protected long compact(String topic, CryptoKeyReader cryptoKeyReader)
+ throws ExecutionException, InterruptedException {
+ return (long) compactor.compact(topic, strategy, cryptoKeyReader).get();
+ }
+
+ @Override
+ protected TwoPhaseCompactor getCompactor() {
+ return compactor;
+ }
+
+
+ @Test
+ public void testNumericOrderCompaction() throws Exception {
+
+ strategy = new NumericOrderCompactionStrategy();
+
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+ final int numMessages = 50;
+ final int maxKeys = 5;
+
+ Producer<Integer> producer = pulsarClient.newProducer(strategy.getSchema())
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ Map<String, Integer> expected = new HashMap<>();
+ List<Pair<String, Integer>> all = new ArrayList<>();
+ Random r = new Random(0);
+
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ for (int j = 0; j < numMessages; j++) {
+ int keyIndex = r.nextInt(maxKeys);
+ String key = "key" + keyIndex;
+ int seed = r.nextInt(j + 1);
+ Integer cur = seed < j / 5 ? null : seed;
+ producer.newMessage().key(key).value(cur).send();
+ Integer prev = expected.get(key);
+ if (!strategy.shouldKeepLeft(prev, cur)) {
+ if (cur == null) {
+ expected.remove(key);
+ } else {
+ expected.put(key, cur);
+ }
+ }
+ all.add(Pair.of(key, cur));
+ }
+
+ compact(topic);
+
+ PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
+ // Compacted topic ledger should have same number of entry equals to number of unique key.
+ Assert.assertEquals(expected.size(), internalStats.compactedLedger.entries);
+ Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
+ Assert.assertFalse(internalStats.compactedLedger.offloaded);
+
+ Map<String, Integer> expectedCopy = new HashMap<>(expected);
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer<Integer> consumer = pulsarClient.newConsumer(strategy.getSchema()).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ while (!expected.isEmpty()) {
+ Message<Integer> m = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertEquals(m.getValue(), expected.remove(m.getKey()), m.getKey());
+ }
+ Assert.assertTrue(expected.isEmpty());
+ }
+
+ // can get full backlog if read compacted disabled
+ try (Consumer<Integer> consumer = pulsarClient.newConsumer(strategy.getSchema()).topic(topic).subscriptionName("sub1")
+ .readCompacted(false).subscribe()) {
+ while (true) {
+ Message<Integer> m = consumer.receive(2, TimeUnit.SECONDS);
+ Pair<String, Integer> expectedMessage = all.remove(0);
+ Assert.assertEquals(m.getKey(), expectedMessage.getLeft());
+ Assert.assertEquals(m.getValue(), expectedMessage.getRight());
+ if (all.isEmpty()) {
+ break;
+ }
+ }
+ Assert.assertTrue(all.isEmpty());
+ }
+
+ TableView<Integer> tableView = pulsar.getClient().newTableViewBuilder(strategy.getSchema())
+ .topic(topic)
+ .loadConf(Map.of(
+ "topicCompactionStrategyClassName", strategy.getClass().getCanonicalName()))
+ .create();
+ Assert.assertEquals(tableView.entrySet(), expectedCopy.entrySet());
+ }
+
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
new file mode 100644
index 00000000000..91dd8a2bd35
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class StrategicCompactorTest extends CompactorTest {
+ private TopicCompactionStrategy strategy;
+
+ private StrategicTwoPhaseCompactor compactor;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
+ strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
+ }
+
+ @Override
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return (long) compactor.compact(topic, strategy).get();
+ }
+
+ @Override
+ protected Compactor getCompactor() {
+ return compactor;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
new file mode 100644
index 00000000000..0ecd09606ce
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class TopicCompactionStrategyTest {
+ public static class DummyTopicCompactionStrategy implements TopicCompactionStrategy<byte[]> {
+
+ @Override
+ public Schema getSchema() {
+ return Schema.BYTES;
+ }
+
+ @Override
+ public boolean shouldKeepLeft(byte[] prev, byte[] cur) {
+ return false;
+ }
+ }
+
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testLoadInvalidTopicCompactionStrategy() {
+ TopicCompactionStrategy.load("uknown");
+ }
+
+ @Test
+ public void testNumericOrderCompactionStrategy() {
+ TopicCompactionStrategy<Integer> strategy =
+ TopicCompactionStrategy.load(NumericOrderCompactionStrategy.class.getCanonicalName());
+ Assert.assertFalse(strategy.shouldKeepLeft(1, 2));
+ Assert.assertTrue(strategy.shouldKeepLeft(2, 1));
+ }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
index 136d8f53823..9e5008c8bd0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
@@ -89,7 +89,15 @@ public interface TableView<T> extends Closeable {
void forEach(BiConsumer<String, T> action);
/**
- * Performs the give action for each entry in this map until all entries
+ * Performs the given action for each future entry in this map until all entries
+ * have been processed or the action throws an exception.
+ *
+ * @param action The action to be performed for each entry
+ */
+ void listen(BiConsumer<String, T> action);
+
+ /**
+ * Performs the given action for each entry in this map until all entries
* have been processed or the action throws an exception.
*
* @param action The action to be performed for each entry
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index c601f51a725..fdbf1f15c29 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -49,21 +49,21 @@ import org.slf4j.LoggerFactory;
*/
class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
- private MessageMetadata messageMetadata = new MessageMetadata();
+ protected MessageMetadata messageMetadata = new MessageMetadata();
// sequence id for this batch which will be persisted as a single entry by broker
@Getter
@Setter
- private long lowestSequenceId = -1L;
+ protected long lowestSequenceId = -1L;
@Getter
@Setter
- private long highestSequenceId = -1L;
- private ByteBuf batchedMessageMetadataAndPayload;
- private List<MessageImpl<?>> messages = new ArrayList<>(maxMessagesNum);
+ protected long highestSequenceId = -1L;
+ protected ByteBuf batchedMessageMetadataAndPayload;
+ protected List<MessageImpl<?>> messages = new ArrayList<>(maxMessagesNum);
protected SendCallback previousCallback = null;
// keep track of callbacks for individual messages being published in a batch
protected SendCallback firstCallback;
- private final ByteBufAllocator allocator;
+ protected final ByteBufAllocator allocator;
private static final int SHRINK_COOLING_OFF_PERIOD = 10;
private int consecutiveShrinkTime = 0;
@@ -111,9 +111,11 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
}
} catch (Throwable e) {
log.error("construct first message failed, exception is ", e);
- producer.semaphoreRelease(getNumMessagesInBatch());
- producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize()
- + batchAllocatedSizeBytes);
+ if (producer != null) {
+ producer.semaphoreRelease(getNumMessagesInBatch());
+ producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize()
+ + batchAllocatedSizeBytes);
+ }
discard(new PulsarClientException(e));
return false;
}
@@ -131,11 +133,14 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
messageMetadata.setSequenceId(lowestSequenceId);
}
highestSequenceId = msg.getSequenceId();
- ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
+ if (producer != null) {
+ ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
+ }
+
return isBatchFull();
}
- private ByteBuf getCompressedBatchMetadataAndPayload() {
+ protected ByteBuf getCompressedBatchMetadataAndPayload() {
int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex();
int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex();
@@ -308,11 +313,13 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
return op;
}
- private void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
+ protected void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
int delta = updatedSizeBytes - batchAllocatedSizeBytes;
batchAllocatedSizeBytes = updatedSizeBytes;
if (delta != 0) {
- producer.client.getMemoryLimitController().forceReserveMemory(delta);
+ if (producer != null) {
+ producer.client.getMemoryLimitController().forceReserveMemory(delta);
+ }
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 238beffc79f..83931c40394 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -36,7 +36,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
@@ -69,7 +68,8 @@ public class ReaderImpl<T> implements Reader<T> {
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
- consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
+ consumerConfiguration.setSubscriptionMode(readerConfiguration.getSubscriptionMode());
+ consumerConfiguration.setSubscriptionInitialPosition(readerConfiguration.getSubscriptionInitialPosition());
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
index 31df24b60f8..a7dc6f94a49 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
@@ -30,6 +30,7 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
private String topicName = null;
private String subscriptionName = null;
private long autoUpdatePartitionsSeconds = 60;
+ private String topicCompactionStrategyClassName = null;
@Override
public TableViewConfigurationData clone() {
@@ -38,6 +39,7 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
clone.setTopicName(topicName);
clone.setAutoUpdatePartitionsSeconds(autoUpdatePartitionsSeconds);
clone.setSubscriptionName(subscriptionName);
+ clone.setTopicCompactionStrategyClassName(topicCompactionStrategyClassName);
return clone;
} catch (CloneNotSupportedException e) {
throw new AssertionError();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 9e7257f23fb..64f2e642991 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
@Slf4j
public class TableViewImpl<T> implements TableView<T> {
@@ -54,6 +55,7 @@ public class TableViewImpl<T> implements TableView<T> {
private final List<BiConsumer<String, T>> listeners;
private final ReentrantLock listenersMutex;
private final boolean isPersistentTopic;
+ private TopicCompactionStrategy<T> compactionStrategy;
TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
this.conf = conf;
@@ -62,6 +64,7 @@ public class TableViewImpl<T> implements TableView<T> {
this.immutableData = Collections.unmodifiableMap(data);
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
+ this.compactionStrategy = TopicCompactionStrategy.load(conf.getTopicCompactionStrategyClassName());
ReaderBuilder<T> readerBuilder = client.newReader(schema)
.topic(conf.getTopicName())
.startMessageId(MessageId.earliest)
@@ -125,6 +128,16 @@ public class TableViewImpl<T> implements TableView<T> {
data.forEach(action);
}
+ @Override
+ public void listen(BiConsumer<String, T> action) {
+ try {
+ listenersMutex.lock();
+ listeners.add(action);
+ } finally {
+ listenersMutex.unlock();
+ }
+ }
+
@Override
public void forEachAndListen(BiConsumer<String, T> action) {
// Ensure we iterate over all the existing entry _and_ start the listening from the exact next message
@@ -157,30 +170,40 @@ public class TableViewImpl<T> implements TableView<T> {
private void handleMessage(Message<T> msg) {
try {
if (msg.hasKey()) {
+ String key = msg.getKey();
+ T cur = msg.size() > 0 ? msg.getValue() : null;
if (log.isDebugEnabled()) {
log.debug("Applying message from topic {}. key={} value={}",
conf.getTopicName(),
- msg.getKey(),
- msg.getValue());
+ key,
+ cur);
}
- try {
- listenersMutex.lock();
- if (null == msg.getValue()){
- data.remove(msg.getKey());
- } else {
- data.put(msg.getKey(), msg.getValue());
- }
+ T prev = data.get(key);
+ boolean update = true;
+ if (compactionStrategy != null) {
+ update = !compactionStrategy.shouldKeepLeft(prev, cur);
+ }
+
+ if (update) {
+ try {
+ listenersMutex.lock();
+ if (null == cur) {
+ data.remove(key);
+ } else {
+ data.put(key, cur);
+ }
- for (BiConsumer<String, T> listener : listeners) {
- try {
- listener.accept(msg.getKey(), msg.getValue());
- } catch (Throwable t) {
- log.error("Table view listener raised an exception", t);
+ for (BiConsumer<String, T> listener : listeners) {
+ try {
+ listener.accept(key, cur);
+ } catch (Throwable t) {
+ log.error("Table view listener raised an exception", t);
+ }
}
+ } finally {
+ listenersMutex.unlock();
}
- } finally {
- listenersMutex.unlock();
}
}
} finally {
@@ -208,6 +231,9 @@ public class TableViewImpl<T> implements TableView<T> {
handleMessage(msg);
readAllExistingMessages(reader, future, startTime, messagesRead);
}).exceptionally(ex -> {
+ logException(
+ String.format("Reader %s was interrupted while reading existing messages",
+ reader.getTopic()), ex);
future.completeExceptionally(ex);
return null;
});
@@ -231,8 +257,18 @@ public class TableViewImpl<T> implements TableView<T> {
handleMessage(msg);
readTailMessages(reader);
}).exceptionally(ex -> {
- log.info("Reader {} was interrupted", reader.getTopic());
+ logException(
+ String.format("Reader %s was interrupted while reading tail messages.",
+ reader.getTopic()), ex);
return null;
});
}
+
+ private void logException(String msg, Throwable ex) {
+ if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
+ log.warn(msg, ex);
+ } else {
+ log.error(msg, ex);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 42e115ea872..86707d2aa2f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -32,6 +32,8 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.ReaderInterceptor;
import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
@Data
public class ReaderConfigurationData<T> implements Serializable, Cloneable {
@@ -153,6 +155,10 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
private long expireTimeOfIncompleteChunkedMessageMillis = TimeUnit.MINUTES.toMillis(1);
+ private SubscriptionMode subscriptionMode = SubscriptionMode.NonDurable;
+
+ private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
@JsonIgnore
public String getTopicName() {
if (topicNames.size() > 1) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
new file mode 100644
index 00000000000..f06374b234f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * Defines a custom strategy to compact messages in a topic.
+ * This strategy can be passed to Topic Compactor and Table View to compact messages in a custom way.
+ *
+ * Examples:
+ *
+ * TopicCompactionStrategy strategy = new MyTopicCompactionStrategy();
+ *
+ * // Run topic compaction by the compaction strategy.
+ * // While compacting messages for each key,
+ * // it will choose messages only if TopicCompactionStrategy.shouldKeepLeft(prev, cur) returns false.
+ * StrategicTwoPhaseCompactor compactor = new StrategicTwoPhaseCompactor(...);
+ * compactor.compact(topic, strategy);
+ *
+ * // Run table view by the compaction strategy.
+ * // While updating messages in the table view <key,value> map,
+ * // it will choose messages only if TopicCompactionStrategy.shouldKeepLeft(prev, cur) returns false.
+ * TableView tableView = pulsar.getClient().newTableViewBuilder(strategy.getSchema())
+ * .topic(topic)
+ * .loadConf(Map.of(
+ * "topicCompactionStrategyClassName", strategy.getClass().getCanonicalName()))
+ * .create();
+ */
+public interface TopicCompactionStrategy<T> {
+
+ /**
+ * Returns the schema object for this strategy.
+ * @return
+ */
+ Schema<T> getSchema();
+ /**
+ * Tests if the compaction needs to keep the left(previous message)
+ * compared to the right(current message) for the same key.
+ *
+ * @param prev previous message value
+ * @param cur current message value
+ * @return True if it needs to keep the previous message and ignore the current message. Otherwise, False.
+ */
+ boolean shouldKeepLeft(T prev, T cur);
+
+ static TopicCompactionStrategy load(String topicCompactionStrategyClassName) {
+ if (topicCompactionStrategyClassName == null) {
+ return null;
+ }
+ try {
+ Class<?> clazz = Class.forName(topicCompactionStrategyClassName);
+ Object instance = clazz.getDeclaredConstructor().newInstance();
+ return (TopicCompactionStrategy) instance;
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Error when loading topic compaction strategy: " + topicCompactionStrategyClassName, e);
+ }
+ }
+}