You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2020/06/02 07:40:13 UTC
[pulsar] branch master updated: PIP 37: [pulsar-client] support
large message size (#4400)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 870a637 PIP 37: [pulsar-client] support large message size (#4400)
870a637 is described below
commit 870a637b4906862a611e418341dd926e21458f08
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Jun 2 00:39:55 2020 -0700
PIP 37: [pulsar-client] support large message size (#4400)
* PIP 37: [pulsar-client] support large message size
fix producer
fix ref counts
add timeouts
add validation
fix recycling
fix stats
review
fix test
fix test
fix send message and expiry-consumer-config
fix schema test
fix chunk properties
* fix test
---
.../broker/admin/impl/PersistentTopicsBase.java | 4 +
.../broker/service/AbstractBaseDispatcher.java | 3 +
.../org/apache/pulsar/broker/service/Consumer.java | 7 +-
.../org/apache/pulsar/broker/service/Producer.java | 43 ++-
.../pulsar/broker/service/SendMessageInfo.java | 2 +
.../apache/pulsar/broker/service/ServerCnx.java | 5 +-
.../NonPersistentDispatcherMultipleConsumers.java | 4 +-
...onPersistentDispatcherSingleActiveConsumer.java | 4 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 4 +-
.../PersistentDispatcherMultipleConsumers.java | 3 +-
.../PersistentDispatcherSingleActiveConsumer.java | 5 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 7 +-
.../service/persistent/PersistentSubscription.java | 1 +
.../broker/service/persistent/PersistentTopic.java | 12 +
...ntStickyKeyDispatcherMultipleConsumersTest.java | 2 +
.../pulsar/client/impl/MessageChunkingTest.java | 379 +++++++++++++++++++++
.../apache/pulsar/client/api/ConsumerBuilder.java | 45 +++
.../apache/pulsar/client/api/ProducerBuilder.java | 27 ++
.../org/apache/pulsar/client/cli/CmdConsume.java | 21 +-
.../org/apache/pulsar/client/cli/CmdProduce.java | 13 +-
.../apache/pulsar/client/impl/ConsumerBase.java | 3 +
.../pulsar/client/impl/ConsumerBuilderImpl.java | 18 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 254 ++++++++++++--
.../client/impl/MultiTopicsConsumerImpl.java | 5 +-
.../pulsar/client/impl/NegativeAcksTracker.java | 2 +
.../PersistentAcknowledgmentsGroupingTracker.java | 73 +++-
.../pulsar/client/impl/ProducerBuilderImpl.java | 9 +
.../apache/pulsar/client/impl/ProducerImpl.java | 237 ++++++++-----
.../pulsar/client/impl/UnAckedMessageTracker.java | 14 +
.../impl/conf/ConsumerConfigurationData.java | 7 +
.../impl/conf/ProducerConfigurationData.java | 1 +
.../impl/AcknowledgementsGroupingTrackerTest.java | 2 +
.../apache/pulsar/common/api/proto/PulsarApi.java | 322 +++++++++++++++++
.../pulsar/common/policies/data/ConsumerStats.java | 3 +
.../policies/data/NonPersistentPublisherStats.java | 2 +-
.../common/policies/data/PublisherStats.java | 3 +
.../common/policies/data/SubscriptionStats.java | 3 +
.../pulsar/common/policies/data/TopicStats.java | 3 +
.../apache/pulsar/common/protocol/Commands.java | 6 +
pulsar-common/src/main/proto/PulsarApi.proto | 5 +
.../pulsar/testclient/PerformanceConsumer.java | 21 +-
.../pulsar/testclient/PerformanceProducer.java | 13 +-
42 files changed, 1450 insertions(+), 147 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a8ab7fd..99f49a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1931,6 +1931,10 @@ public class PersistentTopicsBase extends AdminResource {
if (metadata.hasNullValue()) {
responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue());
}
+ if (metadata.getNumChunksFromMsg() > 0) {
+ responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
+ responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId()));
+ }
// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 849d626..7985f30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -71,6 +71,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor) {
int totalMessages = 0;
long totalBytes = 0;
+ int totalChunkedMessages = 0;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
@@ -103,6 +104,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
int batchSize = msgMetadata.getNumMessagesInBatch();
totalMessages += batchSize;
totalBytes += metadataAndPayload.readableBytes();
+ totalChunkedMessages += msgMetadata.hasChunkId() ? 1: 0;
batchSizes.setBatchSize(i, batchSize);
if (indexesAcks != null && cursor != null) {
long[] ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
@@ -119,6 +121,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
+ sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
}
private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index d14ea5e..2131b18 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -87,6 +87,7 @@ public class Consumer {
private long lastConsumedTimestamp;
private long lastAckedTimestamp;
+ private Rate chuckedMessageRate;
// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
@@ -146,6 +147,7 @@ public class Consumer {
this.keySharedMeta = keySharedMeta;
this.cnx = cnx;
this.msgOut = new Rate();
+ this.chuckedMessageRate = new Rate();
this.msgRedeliver = new Rate();
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
@@ -214,7 +216,7 @@ public class Consumer {
*/
public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
- int totalMessages, long totalBytes, RedeliveryTracker redeliveryTracker) {
+ int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
this.lastConsumedTimestamp = System.currentTimeMillis();
final ChannelHandlerContext ctx = cnx.ctx();
final ChannelPromise writePromise = ctx.newPromise();
@@ -258,6 +260,7 @@ public class Consumer {
msgOut.recordMultipleEvents(totalMessages, totalBytes);
msgOutCounter.add(totalMessages);
bytesOutCounter.add(totalBytes);
+ chuckedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) {
@@ -505,10 +508,12 @@ public class Consumer {
public void updateRates() {
msgOut.calculateRate();
+ chuckedMessageRate.calculateRate();
msgRedeliver.calculateRate();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateRedeliver = msgRedeliver.getRate();
+ stats.chuckedMessageRate = chuckedMessageRate.getRate();
}
public ConsumerStats getStats() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index eeb12e9..b19c4b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -66,6 +66,7 @@ public class Producer {
private final long producerId;
private final String appId;
private Rate msgIn;
+ private Rate chuckedMessageRate;
// it records msg-drop rate only for non-persistent topic
private final Rate msgDrop;
private AuthenticationDataSource authenticationData;
@@ -100,6 +101,7 @@ public class Producer {
this.appId = appId;
this.authenticationData = cnx.authenticationData;
this.msgIn = new Rate();
+ this.chuckedMessageRate = new Rate();
this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;
@@ -136,13 +138,13 @@ public class Producer {
return false;
}
- public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
+ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
- publishMessageToTopic(headersAndPayload, sequenceId, batchSize);
+ publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
}
public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
- ByteBuf headersAndPayload, long batchSize) {
+ ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
if (lowestSequenceId > highestSequenceId) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, highestSequenceId, ServerError.MetadataError,
@@ -152,7 +154,7 @@ public class Producer {
return;
}
beforePublish(producerId, highestSequenceId, headersAndPayload, batchSize);
- publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize);
+ publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
}
public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
@@ -197,16 +199,16 @@ public class Producer {
startPublishOperation((int) batchSize, headersAndPayload.readableBytes());
}
- private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize) {
+ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
- System.nanoTime()));
+ isChunked, System.nanoTime()));
}
- private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize) {
+ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize, boolean isChunked) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
- System.nanoTime()));
+ isChunked, System.nanoTime()));
}
private boolean verifyChecksum(ByteBuf headersAndPayload) {
@@ -286,6 +288,8 @@ public class Producer {
private Rate rateIn;
private int msgSize;
private long batchSize;
+ private boolean chunked;
+
private long startTimeNs;
private String originalProducerName;
@@ -302,6 +306,10 @@ public class Producer {
return sequenceId;
}
+ public boolean isChunked() {
+ return chunked;
+ }
+
@Override
public long getHighestSequenceId() {
return highestSequenceId;
@@ -387,18 +395,22 @@ public class Producer {
Commands.newSendReceipt(producer.producerId, sequenceId, highestSequenceId, ledgerId, entryId),
producer.cnx.ctx().voidPromise());
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
+ if (this.chunked) {
+ producer.chuckedMessageRate.recordEvent();
+ }
producer.publishOperationCompleted();
recycle();
}
static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
- long batchSize, long startTimeNs) {
+ long batchSize, boolean chunked, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
callback.rateIn = rateIn;
callback.msgSize = msgSize;
callback.batchSize = batchSize;
+ callback.chunked = chunked;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
@@ -406,7 +418,7 @@ public class Producer {
}
static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
- int msgSize, long batchSize, long startTimeNs) {
+ int msgSize, long batchSize, boolean chunked, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
@@ -417,6 +429,7 @@ public class Producer {
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
+ callback.chunked = chunked;
return callback;
}
@@ -444,6 +457,11 @@ public class Producer {
entryId = -1L;
batchSize = 0L;
startTimeNs = -1L;
+ ledgerId = -1;
+ entryId = -1;
+ batchSize = 0;
+ chunked = false;
+ startTimeNs = -1;
recyclerHandle.recycle(this);
}
}
@@ -525,9 +543,14 @@ public class Producer {
public void updateRates() {
msgIn.calculateRate();
+ chuckedMessageRate.calculateRate();
stats.msgRateIn = msgIn.getRate();
stats.msgThroughputIn = msgIn.getValueRate();
stats.averageMsgSize = msgIn.getAverageValue();
+ stats.chunkedMessageRate = chuckedMessageRate.getRate();
+ if (chuckedMessageRate.getCount() > 0 && this.topic instanceof PersistentTopic) {
+ ((PersistentTopic) this.topic).msgChunkPublished = true;
+ }
if (this.isNonPersistentTopic) {
msgDrop.calculateRate();
((NonPersistentPublisherStats) stats).msgDropRate = msgDrop.getRate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SendMessageInfo.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SendMessageInfo.java
index 3cc0ca2..a5df8dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SendMessageInfo.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SendMessageInfo.java
@@ -26,6 +26,7 @@ import lombok.Data;
public class SendMessageInfo {
private int totalMessages;
private long totalBytes;
+ private int totalChunkedMessages;
private SendMessageInfo() {
// Private constructor so that all usages are through the thread-local instance
@@ -35,6 +36,7 @@ public class SendMessageInfo {
SendMessageInfo smi = THREAD_LOCAL.get();
smi.totalMessages = 0;
smi.totalBytes = 0;
+ smi.totalChunkedMessages = 0;
return smi;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 669b472..125d9bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1214,9 +1214,10 @@ public class ServerCnx extends PulsarHandler {
// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
- headersAndPayload, send.getNumMessages());
+ headersAndPayload, send.getNumMessages(), send.getIsChunk());
} else {
- producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages());
+ producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
+ send.getNumMessages(), send.getIsChunk());
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index d97e0db..2053f5c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -203,7 +203,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null);
consumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
- sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+ sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
} else {
@@ -234,4 +234,4 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherMultipleConsumers.class);
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 7db50cc..160d165 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -64,7 +64,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null);
currentConsumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
- sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+ sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
} else {
entries.forEach(entry -> {
int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
@@ -154,4 +154,4 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
protected void cancelPendingRead() {
// No-op
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 76ac571..b01a432 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -81,7 +81,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo, null, null);
consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, null, sendMessageInfo.getTotalMessages(),
- sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+ sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
} else {
entries.forEach(entry -> {
@@ -95,4 +95,4 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
}
}
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 7514103..1402b40 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.logging.log4j.util.Strings;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
@@ -523,7 +524,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
- sendMessageInfo.getTotalBytes(), redeliveryTracker);
+ sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
int msgSent = sendMessageInfo.getTotalMessages();
start += messagesForC;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4d856e5..263bd7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -246,7 +246,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
currentConsumer
.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
- sendMessageInfo.getTotalBytes(), redeliveryTracker)
+ sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
+ redeliveryTracker)
.addListener(future -> {
if (future.isSuccess()) {
// acquire message-dispatch permits for already delivered messages
@@ -555,4 +556,4 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}
private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 22e45d5..6079d31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -116,11 +116,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
filterEntriesForConsumer(subList, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
consumer.sendMessages(subList, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
- sendMessageInfo.getTotalBytes(), getRedeliveryTracker()).addListener(future -> {
+ sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
+ getRedeliveryTracker()).addListener(future -> {
if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
readMoreEntries();
}
- });
+ });
for (int i = 0; i < messagesForC; i++) {
entriesWithSameKey.getValue().remove(0);
@@ -175,4 +176,4 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 041f511..c207832 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -997,6 +997,7 @@ public class PersistentSubscription implements Subscription {
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
+ subStats.chuckedMessageRate += consumerStats.chuckedMessageRate;
subStats.unackedMessages += consumerStats.unackedMessages;
subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ad2c57e..3b6ea4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -153,6 +153,17 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
+ private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
+
+ // Timestamp of when this topic was last seen active
+ private volatile long lastActive;
+
+ // topic has every published chunked message since topic is loaded
+ public boolean msgChunkPublished;
+
+ // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
+ // doesn't support batch-message
+ private volatile boolean hasBatchMessagePublished = false;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
public volatile long delayedDeliveryTickTimeMillis = 1000;
@@ -1516,6 +1527,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
+ stats.msgChunkPublished = this.msgChunkPublished;
subscriptions.forEach((name, subscription) -> {
SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 7d1935a..99822e9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -103,6 +103,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
any(EntryBatchIndexesAcks.class),
anyInt(),
anyLong(),
+ anyLong(),
any(RedeliveryTracker.class)
);
@@ -146,6 +147,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
any(EntryBatchIndexesAcks.class),
totalMessagesCaptor.capture(),
anyLong(),
+ anyLong(),
any(RedeliveryTracker.class)
);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
new file mode 100644
index 0000000..80dd173
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.junit.Assert.assertNotEquals;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
+import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
+import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Commands.ChecksumType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class MessageChunkingTest extends ProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(MessageChunkingTest.class);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testInvalidConfig() throws Exception {
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+ // batching and chunking can't be enabled together
+ try {
+ Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(true).create();
+ fail("producer creation should have fail");
+ } catch (IllegalArgumentException ie) {
+ // Ok
+ }
+ }
+
+ @Test
+ public void testLargeMessage() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+ this.conf.setMaxMessageSize(5);
+ final int totalMessages = 5;
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+ Producer<byte[]> producer = producerBuilder.compressionType(CompressionType.LZ4).enableChunking(true)
+ .enableBatching(false).create();
+
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+
+ List<String> publishedMessages = Lists.newArrayList();
+ for (int i = 0; i < totalMessages; i++) {
+ String message = createMessagePayload(i * 10);
+ publishedMessages.add(message);
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ List<MessageId> msgIds = Lists.newArrayList();
+ for (int i = 0; i < totalMessages; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.info("[{}] - Published [{}] Received message: [{}]", i, publishedMessages.get(i), receivedMessage);
+ String expectedMessage = publishedMessages.get(i);
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ msgIds.add(msg.getMessageId());
+ }
+
+ pulsar.getBrokerService().updateRates();
+
+ PublisherStats producerStats = topic.getStats(false).publishers.get(0);
+
+ assertTrue(producerStats.chunkedMessageRate > 0);
+
+ ManagedCursorImpl mcursor = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
+ PositionImpl readPosition = (PositionImpl) mcursor.getReadPosition();
+
+ for (MessageId msgId : msgIds) {
+ consumer.acknowledge(msgId);
+ }
+
+ retryStrategically((test) -> {
+ return mcursor.getMarkDeletedPosition().getNext().equals(readPosition);
+ }, 5, 200);
+
+ assertEquals(readPosition, mcursor.getMarkDeletedPosition().getNext());
+
+ assertEquals(readPosition.getEntryId(), ((ConsumerImpl) consumer).getAvailablePermits());
+
+ consumer.close();
+ producer.close();
+ log.info("-- Exiting {} test --", methodName);
+
+ }
+
+ @Test
+ public void testLargeMessageAckTimeOut() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+ this.conf.setMaxMessageSize(5);
+ final int totalMessages = 5;
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .ackTimeout(5, TimeUnit.SECONDS).subscribe();
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+ Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(false).create();
+
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+
+ List<String> publishedMessages = Lists.newArrayList();
+ for (int i = 0; i < totalMessages; i++) {
+ String message = createMessagePayload(i * 10);
+ publishedMessages.add(message);
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < totalMessages; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.info("Received message: [{}]", receivedMessage);
+ String expectedMessage = publishedMessages.get(i);
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+
+ retryStrategically((test) -> consumer.getUnAckedMessageTracker().messageIdPartitionMap.isEmpty(), 10,
+ TimeUnit.SECONDS.toMillis(1));
+
+ msg = null;
+ messageSet.clear();
+ MessageId lastMsgId = null;
+ for (int i = 0; i < totalMessages; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ lastMsgId = msg.getMessageId();
+ String receivedMessage = new String(msg.getData());
+ log.info("Received message: [{}]", receivedMessage);
+ String expectedMessage = publishedMessages.get(i);
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+
+ ManagedCursorImpl mcursor = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
+ PositionImpl readPosition = (PositionImpl) mcursor.getReadPosition();
+
+ consumer.acknowledgeCumulative(lastMsgId);
+
+ retryStrategically((test) -> {
+ return mcursor.getMarkDeletedPosition().getNext().equals(readPosition);
+ }, 5, 200);
+
+ assertEquals(readPosition, mcursor.getMarkDeletedPosition().getNext());
+
+ consumer.close();
+ producer.close();
+ log.info("-- Exiting {} test --", methodName);
+
+ }
+
+ @Test
+ public void testPublishWithFailure() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ this.conf.setMaxMessageSize(5);
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+ Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(false)
+ .create();
+
+ stopBroker();
+
+ try {
+ producer.send(createMessagePayload(100).getBytes());
+ fail("should have failed with timeout exception");
+ } catch (PulsarClientException.TimeoutException e) {
+ // Ok
+ }
+ producer.close();
+ }
+
+ @Test(enabled = false)
+ public void testMaxPendingChunkMessages() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+ this.conf.setMaxMessageSize(10);
+ final int totalMessages = 25;
+ final String topicName = "persistent://my-property/my-ns/maxPending";
+ final int totalProducers = 25;
+ ExecutorService executor = Executors.newFixedThreadPool(totalProducers);
+
+ try {
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .maxPendingChuckedMessage(1).autoAckOldestChunkedMessageOnQueueFull(true)
+ .ackTimeout(5, TimeUnit.SECONDS).subscribe();
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+ Producer<byte[]>[] producers = new Producer[totalProducers];
+ int totalPublishedMessages = totalProducers;
+ List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
+ for (int i = 0; i < totalProducers; i++) {
+ producers[i] = producerBuilder.enableChunking(true).enableBatching(false).create();
+ int index = i;
+ executor.submit(() -> {
+ futures.add(producers[index].sendAsync(createMessagePayload(45).getBytes()));
+ });
+ }
+
+ FutureUtil.waitForAll(futures).get();
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < totalMessages; i++) {
+ msg = consumer.receive(1, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ String receivedMessage = new String(msg.getData());
+ log.info("Received message: [{}]", receivedMessage);
+ messageSet.add(receivedMessage);
+ consumer.acknowledge(msg);
+ }
+
+ assertNotEquals(messageSet.size(), totalPublishedMessages);
+ } finally {
+ executor.shutdown();
+ }
+
+ }
+
+ /**
+ * Validate that chunking is not supported with batching and non-persistent topic
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testInvalidUseCaseForChunking() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+ this.conf.setMaxMessageSize(5);
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+ try {
+ Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(true).create();
+ fail("it should have failied because chunking can't be used with batching enabled");
+ } catch (IllegalArgumentException ie) {
+ // Ok
+ }
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test
+ public void testExpireIncompleteChunkMessage() throws Exception{
+ final String topicName = "persistent://prop/use/ns-abc/expireMsg";
+
+ // 1. producer connect
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .sendTimeout(10, TimeUnit.MINUTES)
+ .create();
+ Field producerIdField = ProducerImpl.class.getDeclaredField("producerId");
+ producerIdField.setAccessible(true);
+ long producerId = (long) producerIdField.get(producer);
+ producer.cnx().registerProducer(producerId, producer); // registered spy ProducerImpl
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("my-sub").subscribe();
+
+ TypedMessageBuilderImpl<byte[]> msg = (TypedMessageBuilderImpl<byte[]>) producer.newMessage().value("message-1".getBytes());
+ ByteBuf payload = Unpooled.wrappedBuffer(msg.getContent());
+ Builder metadataBuilder = ((TypedMessageBuilderImpl<byte[]>) msg).getMetadataBuilder();
+ MessageMetadata msgMetadata = metadataBuilder.setProducerName("test").setSequenceId(1).setPublishTime(10L)
+ .setUuid("123").setNumChunksFromMsg(2).setChunkId(0).setTotalChunkMsgSize(100).build();
+ ByteBufPair cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
+ MessageImpl msgImpl = ((MessageImpl<byte[]>) msg.getMessage());
+ msgImpl.setSchemaState(SchemaState.Ready);
+ OpSendMsg op = OpSendMsg.create(msgImpl, cmd, 1, null);
+ producer.processOpSendMsg(op);
+
+ retryStrategically((test) -> {
+ return consumer.chunkedMessagesMap.size() > 0;
+ }, 5, 500);
+ assertEquals(consumer.chunkedMessagesMap.size(), 1);
+
+ consumer.expireTimeOfIncompleteChunkedMessageMillis = 1;
+ Thread.sleep(10);
+ consumer.removeExpireIncompleteChunkedMessages();
+ assertEquals(consumer.chunkedMessagesMap.size(), 0);
+
+ producer.close();
+ consumer.close();
+ producer = null; // clean reference of mocked producer
+ }
+
+ private String createMessagePayload(int size) {
+ StringBuilder str = new StringBuilder();
+ Random rand = new Random();
+ for (int i = 0; i < size; i++) {
+ str.append(rand.nextInt(10));
+ }
+ return str.toString();
+ }
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 461ba70..def1807 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -609,4 +609,49 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> enableRetry(boolean retryEnable);
+ /**
+ * Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While
+ * consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed
+ * with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different
+ * messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or
+ * publisher failed to publish all chunks of the messages.
+ *
+ * <pre>
+ * eg: M1-C1, M2-C1, M1-C2, M2-C2
+ * Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.
+ * </pre>
+ * Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be
+ * guarded by providing this @maxPendingChuckedMessage threshold. Once, consumer reaches this threshold, it drops
+ * the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked.
+ * This behavior can be controlled by configuration: @autoAckOldestChunkedMessageOnQueueFull
+ *
+ * @default 100
+ *
+ * @param maxPendingChuckedMessage
+ * @return
+ */
+ ConsumerBuilder<T> maxPendingChuckedMessage(int maxPendingChuckedMessage);
+
+ /**
+ * Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be
+ * guarded by providing this @maxPendingChuckedMessage threshold. Once, consumer reaches this threshold, it drops
+ * the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it
+ * marks them for redelivery.
+ *
+ * @default false
+ *
+ * @param autoAckOldestChunkedMessageOnQueueFull
+ * @return
+ */
+ ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull);
+
+ /**
+ * If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if consumer
+ * won't be able to receive all chunks in expire times (default 1 hour).
+ *
+ * @param duration
+ * @param unit
+ * @return
+ */
+ ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index c4a250e..f94e6c2 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -293,6 +293,33 @@ public interface ProducerBuilder<T> extends Cloneable {
ProducerBuilder<T> enableBatching(boolean enableBatching);
/**
+ * If message size is higher than allowed max publish-payload size by broker then enableChunking helps producer to
+ * split message into multiple chunks and publish them to broker separately and in order. So, it allows client to
+ * successfully publish large size of messages in pulsar.
+ *
+ * <p>This feature allows publisher to publish large size of message by splitting it to multiple chunks and let
+ * consumer stitch them together to form a original large published message. Therefore, it's necessary to configure
+ * recommended configuration at pulsar producer and consumer. Recommendation to use this feature:
+ *
+ * <pre>
+ * 1. This feature is right now only supported by non-shared subscription and persistent-topic.
+ * 2. Disable batching to use chunking feature
+ * 3. Pulsar-client keeps published messages into buffer until it receives ack from broker.
+ * So, it's better to reduce "maxPendingMessages" size to avoid producer occupying large amount
+ * of memory by buffered messages.
+ * 4. Set message-ttl on the namespace to cleanup incomplete chunked messages.
+ * (sometime due to broker-restart or publish time, producer might fail to publish entire large message
+ * so, consumer will not be able to consume and ack those messages. So, those messages can
+ * be only discared by msg ttl) Or configure
+ * {@link ConsumerBuilder#expireTimeOfIncompleteChunkedMessage()}
+ * 5. Consumer configuration: consumer should also configure receiverQueueSize and maxPendingChuckedMessage
+ * </pre>
+ * @param enableChunking
+ * @return
+ */
+ ProducerBuilder<T> enableChunking(boolean enableChunking);
+
+ /**
* Sets a {@link CryptoKeyReader}.
*
* <p>Configure the key reader to be used to encrypt the message payloads.
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 673a45e..1812a96 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -101,7 +101,17 @@ public class CmdConsume {
@Parameter(names = { "--regex" }, description = "Indicate the topic name is a regex pattern")
private boolean isRegex = false;
+
+ @Parameter(names = { "-q", "--queue-size" }, description = "Consumer receiver queue size.")
+ private int receiverQueueSize = 0;
+ @Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max pending chunk messages")
+ private int maxPendingChuckedMessage = 0;
+
+ @Parameter(names = { "-ac",
+ "--auto_ack_chunk_q_full" }, description = "Auto ack for oldest message on queue is full")
+ private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+
private ClientBuilder clientBuilder;
private Authentication authentication;
private String serviceURL;
@@ -195,7 +205,16 @@ public class CmdConsume {
builder.topic(topic);
}
- Consumer<byte[]> consumer = builder.subscribe();
+ ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer().topic(topic);
+ if (this.maxPendingChuckedMessage > 0) {
+ consumerBuilder.maxPendingChuckedMessage(this.maxPendingChuckedMessage);
+ }
+ if (this.receiverQueueSize > 0) {
+ consumerBuilder.maxPendingChuckedMessage(this.receiverQueueSize);
+ }
+ Consumer<byte[]> consumer = consumerBuilder.subscriptionName(this.subscriptionName)
+ .autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull)
+ .subscriptionType(subscriptionType).subscribe();
RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index f143ef3..9b67a08 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -44,7 +44,9 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -94,6 +96,10 @@ public class CmdProduce {
description = "Rate (in msg/sec) at which to produce," +
" value 0 means to produce messages as fast as possible.")
private double publishRate = 0;
+
+ @Parameter(names = { "-c",
+ "--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
+ private boolean chunkingAllowed = false;
@Parameter(names = { "-s", "--separator" },
description = "Character to split messages string on default is comma")
@@ -197,7 +203,12 @@ public class CmdProduce {
try {
PulsarClient client = clientBuilder.build();
- Producer<byte[]> producer = client.newProducer().topic(topic).create();
+ ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic);
+ if (this.chunkingAllowed) {
+ producerBuilder.enableChunking(true);
+ producerBuilder.enableBatching(false);
+ }
+ Producer<byte[]> producer = producerBuilder.create();
List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index a0e6acd..3d45931 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +68,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorService listenerExecutor;
final BlockingQueue<Message<T>> incomingMessages;
+ protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunckedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected int maxReceiverQueueSize;
protected final Schema<T> schema;
@@ -91,6 +93,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
this.consumerEventListener = conf.getConsumerEventListener();
// Always use growable queue since items can exceed the advertised size
this.incomingMessages = new GrowableArrayBlockingQueue<>();
+ this.unAckedChunckedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
this.listenerExecutor = listenerExecutor;
this.pendingReceives = Queues.newConcurrentLinkedQueue();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index a1ef384..9240ab3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -280,6 +280,18 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
}
@Override
+ public ConsumerBuilder<T> maxPendingChuckedMessage(int maxPendingChuckedMessage) {
+ conf.setMaxPendingChuckedMessage(maxPendingChuckedMessage);
+ return this;
+ }
+
+ @Override
+ public ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull) {
+ conf.setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
+ return this;
+ }
+
+ @Override
public ConsumerBuilder<T> property(String key, String value) {
checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
"property key/value cannot be blank");
@@ -403,4 +415,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return this;
}
+ @Override
+ public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
+ conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
+ return null;
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 662222f..d573d82 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,10 +25,14 @@ import static org.apache.pulsar.common.protocol.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
-
+import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.ArrayList;
@@ -36,6 +40,8 @@ import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -43,6 +49,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -98,6 +105,8 @@ import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -165,6 +174,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
protected volatile boolean paused;
+
+ protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+ private int pendingChunckedMessageCount = 0;
+ protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
+ private boolean expireChunkMessageTaskScheduled = false;
+ private int maxPendingChuckedMessage;
+ // if queue size is reasonable (most of the time equal to number of producers try to publish messages concurrently on
+ // the topic) then it guards against broken chuncked message which was not fully published
+ private boolean autoAckOldestChunkedMessageOnQueueFull;
+ // it will be used to manage N outstanding chunked mesage buffers
+ private final BlockingQueue<String> pendingChunckedMessageUuidQueue;
private final boolean createTopicIfDoesNotExist;
@@ -214,6 +234,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
this.resetIncludeHead = conf.isResetIncludeHead();
this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
+ this.maxPendingChuckedMessage = conf.getMaxPendingChuckedMessage();
+ this.pendingChunckedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
+ this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
+ this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -1012,6 +1036,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
final int numMessages = msgMetadata.getNumMessagesInBatch();
+ final boolean isChunkedMessage = msgMetadata.getNumChunksFromMsg() > 1 && conf.getSubscriptionType() != SubscriptionType.Shared;
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
@@ -1034,8 +1059,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
// uncompress decryptedPayload and release decryptedPayload-ByteBuf
- ByteBuf uncompressedPayload = isMessageUndecryptable ? decryptedPayload.retain()
- : uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx);
+ ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage) ? decryptedPayload.retain()
+ : uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx, true);
decryptedPayload.release();
if (uncompressedPayload == null) {
// Message was discarded on decompression error
@@ -1045,6 +1070,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
+
+ // right now, chunked messages are only supported by non-shared subscription
+ if (isChunkedMessage) {
+ uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
+ if (uncompressedPayload == null) {
+ msgMetadata.recycle();
+ return;
+ }
+ }
if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
@@ -1094,6 +1128,88 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
}
+ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId,
+ MessageIdData messageId, ClientCnx cnx) {
+
+ // Lazy task scheduling to expire incomplete chunk message
+ if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
+ ((ScheduledExecutorService) listenerExecutor).scheduleAtFixedRate(() -> {
+ removeExpireIncompleteChunkedMessages();
+ }, expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
+ TimeUnit.MILLISECONDS);
+ expireChunkMessageTaskScheduled = true;
+ }
+
+ if (msgMetadata.getChunkId() == 0) {
+ ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
+ msgMetadata.getTotalChunkMsgSize());
+ int totalChunks = msgMetadata.getNumChunksFromMsg();
+ chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
+ (key) -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer));
+ pendingChunckedMessageCount++;
+ if (maxPendingChuckedMessage > 0 && pendingChunckedMessageCount > maxPendingChuckedMessage) {
+ removeOldestPendingChunkedMessage();
+ }
+ pendingChunckedMessageUuidQueue.add(msgMetadata.getUuid());
+ }
+
+ ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());
+ // discard message if chunk is out-of-order
+ if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+ || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)
+ || msgMetadata.getChunkId() >= msgMetadata.getTotalChunkMsgSize()) {
+ // means we lost the first chunk: should never happen
+ log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}, total-chunks {}", msgId,
+ (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
+ msgMetadata.getTotalChunkMsgSize());
+ if (chunkedMsgCtx != null) {
+ if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+ ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+ }
+ chunkedMsgCtx.recycle();
+ }
+ chunkedMessagesMap.remove(msgMetadata.getUuid());
+ compressedPayload.release();
+ increaseAvailablePermits(cnx);
+ if (expireTimeOfIncompleteChunkedMessageMillis > 0
+ && System.currentTimeMillis() > (msgMetadata.getPublishTime()
+ + expireTimeOfIncompleteChunkedMessageMillis)) {
+ doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
+ } else {
+ trackMessage(msgId);
+ }
+ return null;
+ }
+
+ chunkedMsgCtx.chunkedMessageIds[msgMetadata.getChunkId()] = msgId;
+ // append the chunked payload and update lastChunkedMessage-id
+ chunkedMsgCtx.chunkedMsgBuffer.writeBytes(compressedPayload);
+ chunkedMsgCtx.lastChunkedMessageId = msgMetadata.getChunkId();
+
+ // if final chunk is not received yet then release payload and return
+ if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() - 1)) {
+ compressedPayload.release();
+ increaseAvailablePermits(cnx);
+ return null;
+ }
+
+ // last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
+ if (log.isDebugEnabled()) {
+ log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
+ msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId, msgMetadata.getSequenceId());
+ }
+ // remove buffer from the map, add chucked messageId to unack-message tracker, and reduce pending-chunked-message count
+ chunkedMessagesMap.remove(msgMetadata.getUuid());
+ unAckedChunckedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
+ pendingChunckedMessageCount--;
+ compressedPayload.release();
+ compressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+ chunkedMsgCtx.recycle();
+ ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, compressedPayload, cnx, false);
+ compressedPayload.release();
+ return uncompressedPayload;
+ }
+
protected void triggerListener(int numMessages) {
// Trigger the notification on the message listener in a separate thread to avoid blocking the networking
// thread while the message processing happens
@@ -1307,19 +1423,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected void trackMessage(Message<?> msg) {
if (msg != null) {
- MessageId messageId = msg.getMessageId();
- if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl) {
- MessageIdImpl id = (MessageIdImpl)messageId;
- if (id instanceof BatchMessageIdImpl) {
- // do not add each item in batch message into tracker
- id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
- }
- if (hasParentConsumer) {
- // we should no longer track this message, TopicsConsumer will take care from now onwards
- unAckedMessageTracker.remove(id);
- } else {
- unAckedMessageTracker.add(id);
- }
+ trackMessage(msg.getMessageId());
+ }
+ }
+
+ protected void trackMessage(MessageId messageId) {
+ if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl) {
+ MessageIdImpl id = (MessageIdImpl)messageId;
+ if (id instanceof BatchMessageIdImpl) {
+ // do not add each item in batch message into tracker
+ id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
+ }
+ if (hasParentConsumer) {
+ //TODO: check parent consumer here
+ // we should no longer track this message, TopicsConsumer will take care from now onwards
+ unAckedMessageTracker.remove(id);
+ } else {
+ unAckedMessageTracker.add(id);
}
}
}
@@ -1412,19 +1532,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload,
- ClientCnx currentCnx) {
+ ClientCnx currentCnx, boolean checkMaxMessageSize) {
CompressionType compressionType = msgMetadata.getCompression();
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = msgMetadata.getUncompressedSize();
int payloadSize = payload.readableBytes();
- if (payloadSize > ClientCnx.getMaxMessageSize()) {
+ if (checkMaxMessageSize && payloadSize > ClientCnx.getMaxMessageSize()) {
// payload size is itself corrupted since it cannot be bigger than the MaxMessageSize
log.error("[{}][{}] Got corrupted payload message size {} at {}", topic, subscription, payloadSize,
- messageId);
+ messageId);
discardCorruptedMessage(messageId, currentCnx, ValidationError.UncompressedSizeCorruption);
return null;
}
-
try {
ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
return uncompressedPayload;
@@ -2035,6 +2154,99 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return topicNameWithoutPartition;
}
+ static class ChunkedMessageCtx {
+
+ protected int totalChunks = -1;
+ protected ByteBuf chunkedMsgBuffer;
+ protected int lastChunkedMessageId = -1;
+ protected MessageIdImpl[] chunkedMessageIds;
+ protected long receivedTime = 0;
+
+ static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf chunkedMsgBuffer) {
+ ChunkedMessageCtx ctx = RECYCLER.get();
+ ctx.totalChunks = numChunksFromMsg;
+ ctx.chunkedMsgBuffer = chunkedMsgBuffer;
+ ctx.chunkedMessageIds = new MessageIdImpl[numChunksFromMsg];
+ ctx.receivedTime = System.currentTimeMillis();
+ return ctx;
+ }
+
+ private final Handle<ChunkedMessageCtx> recyclerHandle;
+
+ private ChunkedMessageCtx(Handle<ChunkedMessageCtx> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() {
+ protected ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
+ return new ChunkedMessageCtx(handle);
+ }
+ };
+
+ public void recycle() {
+ this.totalChunks = -1;
+ this.chunkedMsgBuffer = null;
+ this.lastChunkedMessageId = -1;
+ recyclerHandle.recycle(this);
+ }
+ }
+
+ private void removeOldestPendingChunkedMessage() {
+ ChunkedMessageCtx chunkedMsgCtx = null;
+ String firstPendingMsgUuid = null;
+ while (chunkedMsgCtx == null && !pendingChunckedMessageUuidQueue.isEmpty()) {
+ // remove oldest pending chunked-message group and free memory
+ firstPendingMsgUuid = pendingChunckedMessageUuidQueue.poll();
+ chunkedMsgCtx = StringUtils.isNotBlank(firstPendingMsgUuid) ? chunkedMessagesMap.get(firstPendingMsgUuid)
+ : null;
+ }
+ removeChunkMessage(firstPendingMsgUuid, chunkedMsgCtx, this.autoAckOldestChunkedMessageOnQueueFull);
+ }
+
+ protected void removeExpireIncompleteChunkedMessages() {
+ if (expireTimeOfIncompleteChunkedMessageMillis <= 0) {
+ return;
+ }
+ ChunkedMessageCtx chunkedMsgCtx = null;
+ String messageUUID;
+ while ((messageUUID = pendingChunckedMessageUuidQueue.peek()) != null) {
+ chunkedMsgCtx = StringUtils.isNotBlank(messageUUID) ? chunkedMessagesMap.get(messageUUID) : null;
+ if (chunkedMsgCtx != null && System
+ .currentTimeMillis() > (chunkedMsgCtx.receivedTime + expireTimeOfIncompleteChunkedMessageMillis)) {
+ pendingChunckedMessageUuidQueue.remove(messageUUID);
+ removeChunkMessage(messageUUID, chunkedMsgCtx, true);
+ } else {
+ return;
+ }
+ }
+ }
+
+ private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx, boolean autoAck) {
+ if (chunkedMsgCtx == null) {
+ return;
+ }
+ // clean up pending chuncked-Message
+ chunkedMessagesMap.remove(msgUUID);
+ if (chunkedMsgCtx.chunkedMessageIds != null) {
+ for (MessageIdImpl msgId : chunkedMsgCtx.chunkedMessageIds) {
+ if (msgId == null) {
+ continue;
+ }
+ if (autoAck) {
+ log.info("Removing chunk message-id {}", msgId);
+ doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
+ } else {
+ trackMessage(msgId);
+ }
+ }
+ }
+ if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+ chunkedMsgCtx.chunkedMsgBuffer.release();
+ }
+ chunkedMsgCtx.recycle();
+ pendingChunckedMessageCount--;
+ }
+
private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
-}
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ea5dc67..f3147ce 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -618,7 +618,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
public void redeliverUnacknowledgedMessages() {
lock.writeLock().lock();
try {
- consumers.values().stream().forEach(consumer -> consumer.redeliverUnacknowledgedMessages());
+ consumers.values().stream().forEach(consumer -> {
+ consumer.redeliverUnacknowledgedMessages();
+ consumer.unAckedChunckedMessageIdSequenceMap.clear();
+ });
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
unAckedMessageTracker.clear();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index da1b738..bbdd786 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequnceMap;
class NegativeAcksTracker {
@@ -62,6 +63,7 @@ class NegativeAcksTracker {
long now = System.nanoTime();
nackedMessages.forEach((msgId, timestamp) -> {
if (timestamp < now) {
+ addChunkedMessageIdsAndRemoveFromSequnceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 927594c..937f005 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
@@ -185,10 +186,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
return false;
}
- final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null, ackType, null,
- properties);
-
- cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ newAckCommand(consumer.consumerId, msgId, null, ackType, null, properties, cnx, true /* flush */);
return true;
}
@@ -226,9 +224,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
boolean shouldFlush = false;
if (cumulativeAckFlushRequired) {
- ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId, lastCumulativeAckSet,
- AckType.Cumulative, null, Collections.emptyMap());
- cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+ newAckCommand(consumer.consumerId, lastCumulativeAck, lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx,
+ false /* flush */);
shouldFlush=true;
cumulativeAckFlushRequired = false;
}
@@ -244,7 +241,20 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
break;
}
- entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
+ // if messageId is checked then all the chunked related to that msg also processed so, ack all of
+ // them
+ MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
+ if (chunkMsgIds != null && chunkMsgIds.length > 1) {
+ for (MessageIdImpl cMsgId : chunkMsgIds) {
+ if (cMsgId != null) {
+ entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
+ }
+ }
+ // messages will be acked so, remove checked message sequence
+ this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
+ } else {
+ entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
+ }
}
} else {
// When talking to older brokers, send the acknowledgements individually
@@ -254,8 +264,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
break;
}
- cnx.ctx().write(Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null,
- AckType.Individual, null, Collections.emptyMap()), cnx.ctx().voidPromise());
+ newAckCommand(consumer.consumerId, msgId, null, AckType.Individual, null, Collections.emptyMap(),
+ cnx, false);
shouldFlush = true;
}
}
@@ -300,4 +310,45 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
scheduledTask.cancel(true);
}
}
-}
+
+ private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet,
+ AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx, boolean flush) {
+
+ MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
+ if (chunkMsgIds != null) {
+ if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()) && ackType != AckType.Cumulative) {
+ List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
+ for (MessageIdImpl cMsgId : chunkMsgIds) {
+ if (cMsgId != null && chunkMsgIds.length > 1) {
+ entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
+ }
+ }
+ ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck);
+ if (flush) {
+ cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ } else {
+ cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+ }
+ } else {
+ for (MessageIdImpl cMsgId : chunkMsgIds) {
+ ByteBuf cmd = Commands.newAck(consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(),
+ lastCumulativeAckSet, ackType, validationError, map);
+ if (flush) {
+ cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ } else {
+ cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+ }
+ }
+ }
+ this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
+ } else {
+ ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), lastCumulativeAckSet,
+ ackType, validationError, map);
+ if (flush) {
+ cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ } else {
+ cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 12a52e3..9aaec55 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -96,6 +96,9 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
@Override
public CompletableFuture<Producer<T>> createAsync() {
+ // config validation
+ checkArgument(!(conf.isBatchingEnabled() && conf.isChunkingEnabled()),
+ "Batching and chunking of messages can't be enabled together");
if (conf.getTopicName() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
@@ -187,6 +190,12 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
}
@Override
+ public ProducerBuilder<T> enableChunking(boolean chunkingEnabled) {
+ conf.setChunkingEnabled(chunkingEnabled);
+ return this;
+ }
+
+ @Override
public ProducerBuilder<T> cryptoKeyReader(@NonNull CryptoKeyReader cryptoKeyReader) {
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c249d8f..8f6f59c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -39,12 +39,15 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.ScheduledFuture;
+
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
@@ -66,9 +69,11 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
@@ -365,7 +370,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
// validate msg-size (For batching this will be check at the batch completion size)
int compressedSize = compressedPayload.readableBytes();
- if (compressedSize > ClientCnx.getMaxMessageSize()) {
+ if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
compressedPayload.release();
String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE)
? "Compressed"
@@ -392,95 +397,143 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return;
}
+ // send in chunks
+ int totalChunks = canAddToBatch(msg) ? 1
+ : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
+ + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
+ // chunked message also sent individually so, try to acquire send-permits
+ for (int i = 0; i < (totalChunks - 1); i++) {
+ if (!canEnqueueRequest(callback, message.getSequenceId())) {
+ return;
+ }
+ }
+
try {
synchronized (this) {
- long sequenceId;
- if (!msgMetadataBuilder.hasSequenceId()) {
- sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
- msgMetadataBuilder.setSequenceId(sequenceId);
- } else {
- sequenceId = msgMetadataBuilder.getSequenceId();
+ int readStartIndex = 0;
+ String uuid = UUID.randomUUID().toString();
+ for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+ serializeAndSendMessage(msg, msgMetadataBuilder, payload, uuid, chunkId, totalChunks,
+ readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload,
+ compressedPayload.readableBytes(), uncompressedSize, callback);
+ readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
}
- if (!msgMetadataBuilder.hasPublishTime()) {
- msgMetadataBuilder.setPublishTime(client.getClientClock().millis());
+ }
+ } catch (PulsarClientException e) {
+ e.setSequenceId(msg.getSequenceId());
+ completeCallbackAndReleaseSemaphore(callback, e);
+ } catch (Throwable t) {
+ completeCallbackAndReleaseSemaphore(callback, new PulsarClientException(t, msg.getSequenceId()));
+ }
+ }
- checkArgument(!msgMetadataBuilder.hasProducerName());
+ private void serializeAndSendMessage(MessageImpl<?> msg, Builder msgMetadataBuilder, ByteBuf payload,
+ String uuid, int chunkId, int totalChunks, int readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
+ int compressedPayloadSize,
+ int uncompressedSize, SendCallback callback) throws IOException, InterruptedException {
+ ByteBuf chunkPayload = compressedPayload;
+ Builder chunkMsgMetadataBuilder = msgMetadataBuilder;
+ if (totalChunks > 1 && TopicName.get(topic).isPersistent()) {
+ chunkPayload = compressedPayload.slice(readStartIndex,
+ Math.min(chunkMaxSizeInBytes, chunkPayload.readableBytes() - readStartIndex));
+ // don't retain last chunk payload and builder as it will be not needed for next chunk-iteration and it will
+ // be released once this chunk-message is sent
+ if (chunkId != totalChunks - 1) {
+ chunkPayload.retain();
+ chunkMsgMetadataBuilder = msgMetadataBuilder.clone();
+ }
+ chunkMsgMetadataBuilder.setUuid(uuid);
+ chunkMsgMetadataBuilder.setChunkId(chunkId);
+ chunkMsgMetadataBuilder.setNumChunksFromMsg(totalChunks);
+ chunkMsgMetadataBuilder.setTotalChunkMsgSize(compressedPayloadSize);
+ }
+ long sequenceId;
+ if (!chunkMsgMetadataBuilder.hasSequenceId()) {
+ sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
+ chunkMsgMetadataBuilder.setSequenceId(sequenceId);
+ } else {
+ sequenceId = chunkMsgMetadataBuilder.getSequenceId();
+ }
+ if (!chunkMsgMetadataBuilder.hasPublishTime()) {
+ chunkMsgMetadataBuilder.setPublishTime(client.getClientClock().millis());
- msgMetadataBuilder.setProducerName(producerName);
+ checkArgument(!chunkMsgMetadataBuilder.hasProducerName());
- if (conf.getCompressionType() != CompressionType.NONE) {
- msgMetadataBuilder.setCompression(
- CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
- }
- msgMetadataBuilder.setUncompressedSize(uncompressedSize);
- }
- if (canAddToBatch(msg)) {
- if (canAddToCurrentBatch(msg)) {
- // should trigger complete the batch message, new message will add to a new batch and new batch
- // sequence id use the new message, so that broker can handle the message duplication
- if (sequenceId <= lastSequenceIdPushed) {
- isLastSequenceIdPotentialDuplicated = true;
- if (sequenceId <= lastSequenceIdPublished) {
- log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);
- } else {
- log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.",
- sequenceId);
- }
- doBatchSendAndAdd(msg, callback, payload);
- } else {
- // Should flush the last potential duplicated since can't combine potential duplicated messages
- // and non-duplicated messages into a batch.
- if (isLastSequenceIdPotentialDuplicated) {
- doBatchSendAndAdd(msg, callback, payload);
- } else {
- // handle boundary cases where message being added would exceed
- // batch size and/or max message size
- boolean isBatchFull = batchMessageContainer.add(msg, callback);
- lastSendFuture = callback.getFuture();
- payload.release();
- if (isBatchFull) {
- batchMessageAndSend();
- }
- }
- isLastSequenceIdPotentialDuplicated = false;
- }
+ chunkMsgMetadataBuilder.setProducerName(producerName);
+
+ if (conf.getCompressionType() != CompressionType.NONE) {
+ chunkMsgMetadataBuilder
+ .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
+ }
+ chunkMsgMetadataBuilder.setUncompressedSize(uncompressedSize);
+ }
+
+ if (canAddToBatch(msg) && totalChunks <= 1) {
+ if (canAddToCurrentBatch(msg)) {
+ // should trigger complete the batch message, new message will add to a new batch and new batch
+ // sequence id use the new message, so that broker can handle the message duplication
+ if (sequenceId <= lastSequenceIdPushed) {
+ isLastSequenceIdPotentialDuplicated = true;
+ if (sequenceId <= lastSequenceIdPublished) {
+ log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);
} else {
- doBatchSendAndAdd(msg, callback, payload);
+ log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.",
+ sequenceId);
}
+ doBatchSendAndAdd(msg, callback, payload);
} else {
- ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, compressedPayload);
- // When publishing during replication, we need to set the correct number of message in batch
- // This is only used in tracking the publish rate stats
- int numMessages = msg.getMessageBuilder().hasNumMessagesInBatch()
- ? msg.getMessageBuilder().getNumMessagesInBatch()
- : 1;
- final OpSendMsg op;
- if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
- MessageMetadata msgMetadata = msgMetadataBuilder.build();
- ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
- op = OpSendMsg.create(msg, cmd, sequenceId, callback);
- msgMetadataBuilder.recycle();
- msgMetadata.recycle();
+ // Should flush the last potential duplicated since can't combine potential duplicated messages
+ // and non-duplicated messages into a batch.
+ if (isLastSequenceIdPotentialDuplicated) {
+ doBatchSendAndAdd(msg, callback, payload);
} else {
- op = OpSendMsg.create(msg, null, sequenceId, callback);
- op.rePopulate = () -> {
- MessageMetadata msgMetadata = msgMetadataBuilder.build();
- op.cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
- msgMetadataBuilder.recycle();
- msgMetadata.recycle();
- };
+ // handle boundary cases where message being added would exceed
+ // batch size and/or max message size
+ boolean isBatchFull = batchMessageContainer.add(msg, callback);
+ lastSendFuture = callback.getFuture();
+ payload.release();
+ if (isBatchFull) {
+ batchMessageAndSend();
+ }
}
- op.setNumMessagesInBatch(numMessages);
- op.setBatchSizeByte(encryptedPayload.readableBytes());
- lastSendFuture = callback.getFuture();
- processOpSendMsg(op);
+ isLastSequenceIdPotentialDuplicated = false;
}
+ } else {
+ doBatchSendAndAdd(msg, callback, payload);
}
- } catch (PulsarClientException e) {
- e.setSequenceId(msg.getSequenceId());
- completeCallbackAndReleaseSemaphore(callback, e);
- } catch (Throwable t) {
- completeCallbackAndReleaseSemaphore(callback, new PulsarClientException(t, msg.getSequenceId()));
+ } else {
+ ByteBuf encryptedPayload = encryptMessage(chunkMsgMetadataBuilder, chunkPayload);
+
+ MessageMetadata msgMetadata = chunkMsgMetadataBuilder.build();
+ // When publishing during replication, we need to set the correct number of message in batch
+ // This is only used in tracking the publish rate stats
+ int numMessages = msg.getMessageBuilder().hasNumMessagesInBatch()
+ ? msg.getMessageBuilder().getNumMessagesInBatch()
+ : 1;
+ final OpSendMsg op;
+ if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
+ ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
+ op = OpSendMsg.create(msg, cmd, sequenceId, callback);
+ chunkMsgMetadataBuilder.recycle();
+ msgMetadata.recycle();
+ } else {
+ op = OpSendMsg.create(msg, null, sequenceId, callback);
+ final Builder tmpBuilder = chunkMsgMetadataBuilder;
+ op.rePopulate = () -> {
+ MessageMetadata metadata = msgMetadataBuilder.build();
+ op.cmd = sendMessage(producerId, sequenceId, numMessages, metadata, encryptedPayload);
+ tmpBuilder.recycle();
+ msgMetadata.recycle();
+ };
+ }
+ op.setNumMessagesInBatch(numMessages);
+ op.setBatchSizeByte(encryptedPayload.readableBytes());
+ if (totalChunks > 1) {
+ op.totalChunks = totalChunks;
+ op.chunkId = chunkId;
+ }
+ lastSendFuture = callback.getFuture();
+ processOpSendMsg(op);
}
}
@@ -876,9 +929,18 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
last -> Math.max(last, getHighestSequenceId(finalOp)));
op.setMessageId(ledgerId, entryId, partitionIndex);
try {
- // Need to protect ourselves from any exception being thrown in the future handler from the
- // application
- op.callback.sendComplete(null);
+ // if message is chunked then call callback only on last chunk
+ if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
+ try {
+
+ // Need to protect ourselves from any exception being thrown in the future handler from the
+ // application
+ op.callback.sendComplete(null);
+ } catch (Throwable t) {
+ log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
+ producerName, sequenceId, t);
+ }
+ }
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
sequenceId, t);
@@ -1010,6 +1072,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
long batchSizeByte = 0;
int numMessagesInBatch = 1;
long highestSequenceId;
+ int totalChunks = 0;
+ int chunkId = -1;
static OpSendMsg create(MessageImpl<?> msg, ByteBufPair cmd, long sequenceId, SendCallback callback) {
OpSendMsg op = RECYCLER.get();
@@ -1052,6 +1116,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
sequenceId = -1L;
createdAt = -1L;
highestSequenceId = -1L;
+ totalChunks = 0;
+ chunkId = -1;
recyclerHandle.recycle(this);
}
@@ -1396,7 +1462,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
// Need to protect ourselves from any exception being thrown in the future handler from the
// application
ex.setSequenceId(op.sequenceId);
- op.callback.sendComplete(ex);
+ // if message is chunked then call callback only on last chunk
+ if (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {
+ // Need to protect ourselves from any exception being thrown in the future handler from the
+ // application
+ op.callback.sendComplete(ex);
+ }
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
op.sequenceId, t);
@@ -1511,7 +1582,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
}
- private void processOpSendMsg(OpSendMsg op) {
+ protected void processOpSendMsg(OpSendMsg op) {
if (op == null) {
return;
}
@@ -1672,4 +1743,4 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
-}
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 3e85d48..dfce149 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -131,6 +131,7 @@ public class UnAckedMessageTracker implements Closeable {
if (!headPartition.isEmpty()) {
log.warn("[{}] {} messages have timed-out", consumerBase, headPartition.size());
headPartition.forEach(messageId -> {
+ addChunkedMessageIdsAndRemoveFromSequnceMap(messageId, messageIds, consumerBase);
messageIds.add(messageId);
messageIdPartitionMap.remove(messageId);
});
@@ -150,6 +151,19 @@ public class UnAckedMessageTracker implements Closeable {
}, this.tickDurationInMs, TimeUnit.MILLISECONDS);
}
+ public static void addChunkedMessageIdsAndRemoveFromSequnceMap(MessageId messageId, Set<MessageId> messageIds,
+ ConsumerBase<?> consumerBase) {
+ if (messageId instanceof MessageIdImpl) {
+ MessageIdImpl[] chunkedMsgIds = consumerBase.unAckedChunckedMessageIdSequenceMap.get((MessageIdImpl) messageId);
+ if (chunkedMsgIds != null && chunkedMsgIds.length > 0) {
+ for (MessageIdImpl msgId : chunkedMsgIds) {
+ messageIds.add(msgId);
+ }
+ }
+ consumerBase.unAckedChunckedMessageIdSequenceMap.remove((MessageIdImpl) messageId);
+ }
+ }
+
public void clear() {
writeLock.lock();
try {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 1314327..eb82703 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -84,6 +84,13 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private long tickDurationMillis = 1000;
private int priorityLevel = 0;
+
+ // max pending chunked message to avoid sitting incomplete message into the queue and memory
+ private int maxPendingChuckedMessage = 10;
+
+ private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+
+ private long expireTimeOfIncompleteChunkedMessageMillis = 60 * 1000;
@JsonIgnore
private CryptoKeyReader cryptoKeyReader = null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index b6a0763..fd2f678 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -77,6 +77,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private boolean batchingEnabled = true; // enabled by default
@JsonIgnore
private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
+ private boolean chunkingEnabled = false;
@JsonIgnore
private CryptoKeyReader cryptoKeyReader;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index fc6df91..eac424e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -47,6 +48,7 @@ public class AcknowledgementsGroupingTrackerTest {
public void setup() {
eventLoopGroup = new NioEventLoopGroup(1);
consumer = mock(ConsumerImpl.class);
+ consumer.unAckedChunckedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
cnx = mock(ClientCnx.class);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(cnx.ctx()).thenReturn(ctx);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index d3b9f3c..9691cc9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -3702,6 +3702,22 @@ public final class PulsarApi {
// optional bool null_value = 25 [default = false];
boolean hasNullValue();
boolean getNullValue();
+
+ // optional string uuid = 26;
+ boolean hasUuid();
+ String getUuid();
+
+ // optional int32 num_chunks_from_msg = 27;
+ boolean hasNumChunksFromMsg();
+ int getNumChunksFromMsg();
+
+ // optional int32 total_chunk_msg_size = 28;
+ boolean hasTotalChunkMsgSize();
+ int getTotalChunkMsgSize();
+
+ // optional int32 chunk_id = 29;
+ boolean hasChunkId();
+ int getChunkId();
}
public static final class MessageMetadata extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -4082,6 +4098,68 @@ public final class PulsarApi {
return nullValue_;
}
+ // optional string uuid = 26;
+ public static final int UUID_FIELD_NUMBER = 26;
+ private java.lang.Object uuid_;
+ public boolean hasUuid() {
+ return ((bitField0_ & 0x00100000) == 0x00100000);
+ }
+ public String getUuid() {
+ java.lang.Object ref = uuid_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs =
+ (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+ uuid_ = s;
+ }
+ return s;
+ }
+ }
+ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getUuidBytes() {
+ java.lang.Object ref = uuid_;
+ if (ref instanceof String) {
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b =
+ org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String) ref);
+ uuid_ = b;
+ return b;
+ } else {
+ return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+ }
+ }
+
+ // optional int32 num_chunks_from_msg = 27;
+ public static final int NUM_CHUNKS_FROM_MSG_FIELD_NUMBER = 27;
+ private int numChunksFromMsg_;
+ public boolean hasNumChunksFromMsg() {
+ return ((bitField0_ & 0x00200000) == 0x00200000);
+ }
+ public int getNumChunksFromMsg() {
+ return numChunksFromMsg_;
+ }
+
+ // optional int32 total_chunk_msg_size = 28;
+ public static final int TOTAL_CHUNK_MSG_SIZE_FIELD_NUMBER = 28;
+ private int totalChunkMsgSize_;
+ public boolean hasTotalChunkMsgSize() {
+ return ((bitField0_ & 0x00400000) == 0x00400000);
+ }
+ public int getTotalChunkMsgSize() {
+ return totalChunkMsgSize_;
+ }
+
+ // optional int32 chunk_id = 29;
+ public static final int CHUNK_ID_FIELD_NUMBER = 29;
+ private int chunkId_;
+ public boolean hasChunkId() {
+ return ((bitField0_ & 0x00800000) == 0x00800000);
+ }
+ public int getChunkId() {
+ return chunkId_;
+ }
+
private void initFields() {
producerName_ = "";
sequenceId_ = 0L;
@@ -4106,6 +4184,10 @@ public final class PulsarApi {
txnidMostBits_ = 0L;
highestSequenceId_ = 0L;
nullValue_ = false;
+ uuid_ = "";
+ numChunksFromMsg_ = 0;
+ totalChunkMsgSize_ = 0;
+ chunkId_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -4217,6 +4299,18 @@ public final class PulsarApi {
if (((bitField0_ & 0x00080000) == 0x00080000)) {
output.writeBool(25, nullValue_);
}
+ if (((bitField0_ & 0x00100000) == 0x00100000)) {
+ output.writeBytes(26, getUuidBytes());
+ }
+ if (((bitField0_ & 0x00200000) == 0x00200000)) {
+ output.writeInt32(27, numChunksFromMsg_);
+ }
+ if (((bitField0_ & 0x00400000) == 0x00400000)) {
+ output.writeInt32(28, totalChunkMsgSize_);
+ }
+ if (((bitField0_ & 0x00800000) == 0x00800000)) {
+ output.writeInt32(29, chunkId_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -4322,6 +4416,22 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeBoolSize(25, nullValue_);
}
+ if (((bitField0_ & 0x00100000) == 0x00100000)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBytesSize(26, getUuidBytes());
+ }
+ if (((bitField0_ & 0x00200000) == 0x00200000)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeInt32Size(27, numChunksFromMsg_);
+ }
+ if (((bitField0_ & 0x00400000) == 0x00400000)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeInt32Size(28, totalChunkMsgSize_);
+ }
+ if (((bitField0_ & 0x00800000) == 0x00800000)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeInt32Size(29, chunkId_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -4481,6 +4591,14 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00200000);
nullValue_ = false;
bitField0_ = (bitField0_ & ~0x00400000);
+ uuid_ = "";
+ bitField0_ = (bitField0_ & ~0x00800000);
+ numChunksFromMsg_ = 0;
+ bitField0_ = (bitField0_ & ~0x01000000);
+ totalChunkMsgSize_ = 0;
+ bitField0_ = (bitField0_ & ~0x02000000);
+ chunkId_ = 0;
+ bitField0_ = (bitField0_ & ~0x04000000);
return this;
}
@@ -4610,6 +4728,22 @@ public final class PulsarApi {
to_bitField0_ |= 0x00080000;
}
result.nullValue_ = nullValue_;
+ if (((from_bitField0_ & 0x00800000) == 0x00800000)) {
+ to_bitField0_ |= 0x00100000;
+ }
+ result.uuid_ = uuid_;
+ if (((from_bitField0_ & 0x01000000) == 0x01000000)) {
+ to_bitField0_ |= 0x00200000;
+ }
+ result.numChunksFromMsg_ = numChunksFromMsg_;
+ if (((from_bitField0_ & 0x02000000) == 0x02000000)) {
+ to_bitField0_ |= 0x00400000;
+ }
+ result.totalChunkMsgSize_ = totalChunkMsgSize_;
+ if (((from_bitField0_ & 0x04000000) == 0x04000000)) {
+ to_bitField0_ |= 0x00800000;
+ }
+ result.chunkId_ = chunkId_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -4706,6 +4840,18 @@ public final class PulsarApi {
if (other.hasNullValue()) {
setNullValue(other.getNullValue());
}
+ if (other.hasUuid()) {
+ setUuid(other.getUuid());
+ }
+ if (other.hasNumChunksFromMsg()) {
+ setNumChunksFromMsg(other.getNumChunksFromMsg());
+ }
+ if (other.hasTotalChunkMsgSize()) {
+ setTotalChunkMsgSize(other.getTotalChunkMsgSize());
+ }
+ if (other.hasChunkId()) {
+ setChunkId(other.getChunkId());
+ }
return this;
}
@@ -4880,6 +5026,26 @@ public final class PulsarApi {
nullValue_ = input.readBool();
break;
}
+ case 210: {
+ bitField0_ |= 0x00800000;
+ uuid_ = input.readBytes();
+ break;
+ }
+ case 216: {
+ bitField0_ |= 0x01000000;
+ numChunksFromMsg_ = input.readInt32();
+ break;
+ }
+ case 224: {
+ bitField0_ |= 0x02000000;
+ totalChunkMsgSize_ = input.readInt32();
+ break;
+ }
+ case 232: {
+ bitField0_ |= 0x04000000;
+ chunkId_ = input.readInt32();
+ break;
+ }
}
}
}
@@ -5612,6 +5778,105 @@ public final class PulsarApi {
return this;
}
+ // optional string uuid = 26;
+ private java.lang.Object uuid_ = "";
+ public boolean hasUuid() {
+ return ((bitField0_ & 0x00800000) == 0x00800000);
+ }
+ public String getUuid() {
+ java.lang.Object ref = uuid_;
+ if (!(ref instanceof String)) {
+ String s = ((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref).toStringUtf8();
+ uuid_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setUuid(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00800000;
+ uuid_ = value;
+
+ return this;
+ }
+ public Builder clearUuid() {
+ bitField0_ = (bitField0_ & ~0x00800000);
+ uuid_ = getDefaultInstance().getUuid();
+
+ return this;
+ }
+ void setUuid(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+ bitField0_ |= 0x00800000;
+ uuid_ = value;
+
+ }
+
+ // optional int32 num_chunks_from_msg = 27;
+ private int numChunksFromMsg_ ;
+ public boolean hasNumChunksFromMsg() {
+ return ((bitField0_ & 0x01000000) == 0x01000000);
+ }
+ public int getNumChunksFromMsg() {
+ return numChunksFromMsg_;
+ }
+ public Builder setNumChunksFromMsg(int value) {
+ bitField0_ |= 0x01000000;
+ numChunksFromMsg_ = value;
+
+ return this;
+ }
+ public Builder clearNumChunksFromMsg() {
+ bitField0_ = (bitField0_ & ~0x01000000);
+ numChunksFromMsg_ = 0;
+
+ return this;
+ }
+
+ // optional int32 total_chunk_msg_size = 28;
+ private int totalChunkMsgSize_ ;
+ public boolean hasTotalChunkMsgSize() {
+ return ((bitField0_ & 0x02000000) == 0x02000000);
+ }
+ public int getTotalChunkMsgSize() {
+ return totalChunkMsgSize_;
+ }
+ public Builder setTotalChunkMsgSize(int value) {
+ bitField0_ |= 0x02000000;
+ totalChunkMsgSize_ = value;
+
+ return this;
+ }
+ public Builder clearTotalChunkMsgSize() {
+ bitField0_ = (bitField0_ & ~0x02000000);
+ totalChunkMsgSize_ = 0;
+
+ return this;
+ }
+
+ // optional int32 chunk_id = 29;
+ private int chunkId_ ;
+ public boolean hasChunkId() {
+ return ((bitField0_ & 0x04000000) == 0x04000000);
+ }
+ public int getChunkId() {
+ return chunkId_;
+ }
+ public Builder setChunkId(int value) {
+ bitField0_ |= 0x04000000;
+ chunkId_ = value;
+
+ return this;
+ }
+ public Builder clearChunkId() {
+ bitField0_ = (bitField0_ & ~0x04000000);
+ chunkId_ = 0;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata)
}
@@ -16125,6 +16390,10 @@ public final class PulsarApi {
// optional uint64 highest_sequence_id = 6 [default = 0];
boolean hasHighestSequenceId();
long getHighestSequenceId();
+
+ // optional bool is_chunk = 7 [default = false];
+ boolean hasIsChunk();
+ boolean getIsChunk();
}
public static final class CommandSend extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -16221,6 +16490,16 @@ public final class PulsarApi {
return highestSequenceId_;
}
+ // optional bool is_chunk = 7 [default = false];
+ public static final int IS_CHUNK_FIELD_NUMBER = 7;
+ private boolean isChunk_;
+ public boolean hasIsChunk() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public boolean getIsChunk() {
+ return isChunk_;
+ }
+
private void initFields() {
producerId_ = 0L;
sequenceId_ = 0L;
@@ -16228,6 +16507,7 @@ public final class PulsarApi {
txnidLeastBits_ = 0L;
txnidMostBits_ = 0L;
highestSequenceId_ = 0L;
+ isChunk_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -16272,6 +16552,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeUInt64(6, highestSequenceId_);
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeBool(7, isChunk_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -16304,6 +16587,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeUInt64Size(6, highestSequenceId_);
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeBoolSize(7, isChunk_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -16429,6 +16716,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000010);
highestSequenceId_ = 0L;
bitField0_ = (bitField0_ & ~0x00000020);
+ isChunk_ = false;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -16486,6 +16775,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000020;
}
result.highestSequenceId_ = highestSequenceId_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.isChunk_ = isChunk_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -16510,6 +16803,9 @@ public final class PulsarApi {
if (other.hasHighestSequenceId()) {
setHighestSequenceId(other.getHighestSequenceId());
}
+ if (other.hasIsChunk()) {
+ setIsChunk(other.getIsChunk());
+ }
return this;
}
@@ -16577,6 +16873,11 @@ public final class PulsarApi {
highestSequenceId_ = input.readUInt64();
break;
}
+ case 56: {
+ bitField0_ |= 0x00000040;
+ isChunk_ = input.readBool();
+ break;
+ }
}
}
}
@@ -16709,6 +17010,27 @@ public final class PulsarApi {
return this;
}
+ // optional bool is_chunk = 7 [default = false];
+ private boolean isChunk_ ;
+ public boolean hasIsChunk() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public boolean getIsChunk() {
+ return isChunk_;
+ }
+ public Builder setIsChunk(boolean value) {
+ bitField0_ |= 0x00000040;
+ isChunk_ = value;
+
+ return this;
+ }
+ public Builder clearIsChunk() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ isChunk_ = false;
+
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSend)
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 460c50a..357407a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -41,6 +41,9 @@ public class ConsumerStats {
/** Total rate of messages redelivered by this consumer (msg/s). */
public double msgRateRedeliver;
+ /** Total chunked messages dispatched. */
+ public double chuckedMessageRate;
+
/** Name of the consumer. */
public String consumerName;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java
index 0d661dd..f1602a4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPublisherStats.java
@@ -36,4 +36,4 @@ public class NonPersistentPublisherStats extends PublisherStats {
this.msgDropRate += stats.msgDropRate;
return this;
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
index cec2257..c500033 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PublisherStats.java
@@ -37,6 +37,9 @@ public class PublisherStats {
/** Average message size published by this publisher. */
public double averageMsgSize;
+ /** total chunked message count received. **/
+ public double chunkedMessageRate;
+
/** Id of this publisher. */
public long producerId;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 6994632..18002b5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -43,6 +43,9 @@ public class SubscriptionStats {
/** Total rate of messages redelivered on this subscription (msg/s). */
public double msgRateRedeliver;
+ /** Chunked message dispatch rate. */
+ public int chuckedMessageRate;
+
/** Number of messages in the subscription backlog. */
public long msgBacklog;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index be471e3..8d560a9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -58,6 +58,9 @@ public class TopicStats {
/** Average size of published messages (bytes). */
public double averageMsgSize;
+ /** Topic has chunked message published on it. */
+ public boolean msgChunkPublished;
+
/** Space used to store the messages for the topic (bytes). */
public long storageSize;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index cbc4a39..d5e772a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -488,6 +488,9 @@ public class Commands {
if (txnIdMostBits > 0) {
sendBuilder.setTxnidMostBits(txnIdMostBits);
}
+ if (messageData.hasTotalChunkMsgSize() && messageData.getTotalChunkMsgSize() > 1) {
+ sendBuilder.setIsChunk(true);
+ }
CommandSend send = sendBuilder.build();
ByteBufPair res = serializeCommandSendWithSize(BaseCommand.newBuilder().setType(Type.SEND).setSend(send),
@@ -513,6 +516,9 @@ public class Commands {
if (txnIdMostBits > 0) {
sendBuilder.setTxnidMostBits(txnIdMostBits);
}
+ if (messageData.hasTotalChunkMsgSize() && messageData.getTotalChunkMsgSize() > 1) {
+ sendBuilder.setIsChunk(true);
+ }
CommandSend send = sendBuilder.build();
ByteBufPair res = serializeCommandSendWithSize(BaseCommand.newBuilder().setType(Type.SEND).setSend(send),
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index a0aa8a7..cd7b760 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -139,6 +139,10 @@ message MessageMetadata {
// Indicate if the message payload value is set
optional bool null_value = 25 [ default = false ];
+ optional string uuid = 26;
+ optional int32 num_chunks_from_msg = 27;
+ optional int32 total_chunk_msg_size = 28;
+ optional int32 chunk_id = 29;
}
message SingleMessageMetadata {
@@ -439,6 +443,7 @@ message CommandSend {
/// Add highest sequence id to support batch message with external sequence id
optional uint64 highest_sequence_id = 6 [default = 0];
+ optional bool is_chunk =7 [default = false];
}
message CommandSendReceipt {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index b667eff..61b027d 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -119,6 +119,17 @@ public class PerformanceConsumer {
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
public String authPluginClassName;
+ @Parameter(names = { "-mc", "--max_chunked_msg" }, description = "Max pending chunk messages")
+ private int maxPendingChuckedMessage = 0;
+
+ @Parameter(names = { "-ac",
+ "--auto_ack_chunk_q_full" }, description = "Auto ack for oldest message on queue is full")
+ private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+
+ @Parameter(names = { "-e",
+ "--expire_time_incomplete_chunked_messages" }, description = "Expire time in ms for incomplete chunk messages")
+ private long expireTimeOfIncompleteChunkedMessageMs = 0;
+
@Parameter(
names = { "--auth-params" },
description = "Authentication parameters, whose format is determined by the implementation " +
@@ -274,7 +285,15 @@ public class PerformanceConsumer {
.acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
.subscriptionType(arguments.subscriptionType)
.subscriptionInitialPosition(arguments.subscriptionInitialPosition)
+ .autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull)
.replicateSubscriptionState(arguments.replicatedSubscription);
+ if (arguments.maxPendingChuckedMessage > 0) {
+ consumerBuilder.maxPendingChuckedMessage(arguments.maxPendingChuckedMessage);
+ }
+ if (arguments.expireTimeOfIncompleteChunkedMessageMs > 0) {
+ consumerBuilder.expireTimeOfIncompleteChunkedMessage(arguments.expireTimeOfIncompleteChunkedMessageMs,
+ TimeUnit.MILLISECONDS);
+ }
if (arguments.encKeyName != null) {
byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));
@@ -371,4 +390,4 @@ public class PerformanceConsumer {
}
private static final Logger log = LoggerFactory.getLogger(PerformanceConsumer.class);
-}
+}
\ No newline at end of file
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 0bd40c3..9e9d0e8 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -122,6 +122,10 @@ public class PerformanceProducer {
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
public String authPluginClassName;
+ @Parameter(names = { "-ch",
+ "--chunking" }, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
+ private boolean chunkingAllowed = false;
+
@Parameter(
names = { "--auth-params" },
description = "Authentication parameters, whose format is determined by the implementation " +
@@ -449,7 +453,12 @@ public class PerformanceProducer {
log.info("Adding {} publishers on topic {}", arguments.numProducers, topic);
for (int j = 0; j < arguments.numProducers; j++) {
- futures.add(producerBuilder.clone().topic(topic).createAsync());
+ ProducerBuilder<byte[]> prodBuilder = producerBuilder.clone().topic(topic);
+ if (arguments.chunkingAllowed) {
+ prodBuilder.enableChunking(true);
+ prodBuilder.enableBatching(false);
+ }
+ futures.add(prodBuilder.createAsync());
}
}
@@ -578,4 +587,4 @@ public class PerformanceProducer {
static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
static final DecimalFormat totalFormat = new DecimalFormat("0.000");
private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
-}
+}
\ No newline at end of file