You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/19 20:47:54 UTC
[pulsar] branch master updated: Introduce batch message container
framework and support key based batching container (#4435)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b45736a Introduce batch message container framework and support key based batching container (#4435)
b45736a is described below
commit b45736ad0738116c9c3cae27ed18f1342b55139e
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 20 04:47:48 2019 +0800
Introduce batch message container framework and support key based batching container (#4435)
### Motivation
Introduce batch message container framework to support multiple ways to do message batch.
Currently, pulsar support a most basic batch message container, use the batch message container framework can quickly implement other types batch message container, even users can customize their own batch message container.
Add a new batch message container named BatchMessageKeyBasedContainer to support batching message in key_shared subscription mode.
---
.../pulsar/broker/service/BatchMessageTest.java | 163 ++++++++++++----
.../client/api/KeySharedSubscriptionTest.java | 167 ++++++----------
.../pulsar/client/api/BatchMessageContainer.java | 64 ++++++
.../apache/pulsar/client/api/BatcherBuilder.java | 56 ++++++
.../apache/pulsar/client/api/ProducerBuilder.java | 12 +-
.../client/internal/DefaultImplementation.java | 13 ++
.../client/impl/AbstractBatchMessageContainer.java | 87 +++++++++
.../client/impl/BatchMessageContainerBase.java | 68 +++++++
...ntainer.java => BatchMessageContainerImpl.java} | 143 +++++++-------
.../client/impl/BatchMessageKeyBasedContainer.java | 217 +++++++++++++++++++++
.../pulsar/client/impl/DefaultBatcherBuilder.java | 30 +++
.../pulsar/client/impl/KeyBasedBatcherBuilder.java | 30 +++
.../pulsar/client/impl/ProducerBuilderImpl.java | 8 +
.../apache/pulsar/client/impl/ProducerImpl.java | 123 +++++-------
.../impl/conf/ProducerConfigurationData.java | 3 +-
.../impl/conf/ConfigurationDataUtilsTest.java | 1 +
.../apache/pulsar/common/protocol/Commands.java | 3 +
.../java/org/apache/pulsar/storm/PulsarBolt.java | 1 +
18 files changed, 914 insertions(+), 275 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 516197d..265e3ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import com.google.common.collect.Lists;
@@ -30,8 +29,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -39,12 +40,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
@@ -69,16 +72,31 @@ public class BatchMessageTest extends BrokerTestBase {
super.internalCleanup();
}
- @DataProvider(name = "codec")
- public Object[][] codecProvider() {
- return new Object[][] { { CompressionType.NONE }, { CompressionType.LZ4 }, { CompressionType.ZLIB }, };
+ @DataProvider(name = "codecAndContainerBuilder")
+ public Object[][] codecAndContainerBuilderProvider() {
+ return new Object[][] {
+ { CompressionType.NONE, BatcherBuilder.DEFAULT },
+ { CompressionType.LZ4, BatcherBuilder.DEFAULT },
+ { CompressionType.ZLIB, BatcherBuilder.DEFAULT },
+ { CompressionType.NONE, BatcherBuilder.KEY_BASED },
+ { CompressionType.LZ4, BatcherBuilder.KEY_BASED },
+ { CompressionType.ZLIB, BatcherBuilder.KEY_BASED }
+ };
}
- @Test(dataProvider = "codec")
- public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType) throws Exception {
+ @DataProvider(name = "containerBuilder")
+ public Object[][] containerBuilderProvider() {
+ return new Object[][] {
+ { BatcherBuilder.DEFAULT },
+ { BatcherBuilder.KEY_BASED }
+ };
+ }
+
+ @Test(dataProvider = "codecAndContainerBuilder")
+ public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 50;
int numMsgsInBatch = numMsgs / 2;
- final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize";
+ final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-" + UUID.randomUUID();
final String subscriptionName = "sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -87,6 +105,7 @@ public class BatchMessageTest extends BrokerTestBase {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+ .batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -117,10 +136,10 @@ public class BatchMessageTest extends BrokerTestBase {
producer.close();
}
- @Test(dataProvider = "codec")
- public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressionType) throws Exception {
+ @Test(dataProvider = "codecAndContainerBuilder")
+ public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 100;
- final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchTime";
+ final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchTime-" + UUID.randomUUID();
final String subscriptionName = "time-sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -128,7 +147,9 @@ public class BatchMessageTest extends BrokerTestBase {
consumer.close();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
- .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).enableBatching(true).create();
+ .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).enableBatching(true)
+ .batcherBuilder(builder)
+ .create();
Random random = new Random();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -151,10 +172,10 @@ public class BatchMessageTest extends BrokerTestBase {
producer.close();
}
- @Test(dataProvider = "codec")
- public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType compressionType) throws Exception {
+ @Test(dataProvider = "codecAndContainerBuilder")
+ public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 100;
- final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime";
+ final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime-" + UUID.randomUUID();
final String subscriptionName = "time-size-sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -163,6 +184,7 @@ public class BatchMessageTest extends BrokerTestBase {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).batchingMaxMessages(5)
+ .batcherBuilder(builder)
.compressionType(compressionType).enableBatching(true).create();
Random random = new Random();
@@ -186,11 +208,11 @@ public class BatchMessageTest extends BrokerTestBase {
producer.close();
}
- @Test(dataProvider = "codec")
- public void testBatchProducerWithLargeMessage(CompressionType compressionType) throws Exception {
+ @Test(dataProvider = "codecAndContainerBuilder")
+ public void testBatchProducerWithLargeMessage(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 50;
int numMsgsInBatch = numMsgs / 2;
- final String topicName = "persistent://prop/ns-abc/testBatchProducerWithLargeMessage";
+ final String topicName = "persistent://prop/ns-abc/testBatchProducerWithLargeMessage-" + UUID.randomUUID();
final String subscriptionName = "large-message-sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -199,6 +221,7 @@ public class BatchMessageTest extends BrokerTestBase {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+ .batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -238,11 +261,11 @@ public class BatchMessageTest extends BrokerTestBase {
producer.close();
}
- @Test(dataProvider = "codec")
- public void testSimpleBatchProducerConsumer(CompressionType compressionType) throws Exception {
+ @Test(dataProvider = "codecAndContainerBuilder")
+ public void testSimpleBatchProducerConsumer(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 500;
int numMsgsInBatch = numMsgs / 20;
- final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer";
+ final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer-" + UUID.randomUUID();
final String subscriptionName = "pc-sub-1" + compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -257,6 +280,7 @@ public class BatchMessageTest extends BrokerTestBase {
// disabled size based batch
.batchingMaxMessages(2 * numMsgs)
.enableBatching(true)
+ .batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -296,11 +320,11 @@ public class BatchMessageTest extends BrokerTestBase {
producer.close();
}
- @Test
- public void testSimpleBatchSyncProducerWithFixedBatchSize() throws Exception {
+ @Test(dataProvider = "containerBuilder")
+ public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder builder) throws Exception {
int numMsgs = 10;
int numMsgsInBatch = numMsgs / 2;
- final String topicName = "persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize";
+ final String topicName = "persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize-" + UUID.randomUUID();
final String subscriptionName = "syncsub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -309,6 +333,7 @@ public class BatchMessageTest extends BrokerTestBase {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+ .batcherBuilder(builder)
.create();
for (int i = 0; i < numMsgs; i++) {
@@ -338,11 +363,11 @@ public class BatchMessageTest extends BrokerTestBase {
}
- @Test
- public void testSimpleBatchProducerConsumer1kMessages() throws Exception {
+ @Test(dataProvider = "containerBuilder")
+ public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder builder) throws Exception {
int numMsgs = 2000;
int numMsgsInBatch = 4;
- final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer1kMessages";
+ final String topicName = "persistent://prop/ns-abc/testSimpleBatchProducerConsumer1kMessages-" + UUID.randomUUID();
final String subscriptionName = "pc1k-sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -351,6 +376,7 @@ public class BatchMessageTest extends BrokerTestBase {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).maxPendingMessages(numMsgs + 1)
.batchingMaxPublishDelay(30, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+ .batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -465,11 +491,11 @@ public class BatchMessageTest extends BrokerTestBase {
producer.close();
}
- @Test
- public void testNonBatchCumulativeAckAfterBatchPublish() throws Exception {
+ @Test(dataProvider = "containerBuilder")
+ public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder builder) throws Exception {
int numMsgs = 10;
int numMsgsInBatch = numMsgs;
- final String topicName = "persistent://prop/ns-abc/testNonBatchCumulativeAckAfterBatchPublish";
+ final String topicName = "persistent://prop/ns-abc/testNonBatchCumulativeAckAfterBatchPublish-" + UUID.randomUUID();
final String subscriptionName = "nbcaabp-sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -478,6 +504,7 @@ public class BatchMessageTest extends BrokerTestBase {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true)
+ .batcherBuilder(builder)
.create();
// create producer to publish non batch messages
Producer<byte[]> noBatchProducer = pulsarClient.newProducer().topic(topicName).create();
@@ -517,11 +544,11 @@ public class BatchMessageTest extends BrokerTestBase {
noBatchProducer.close();
}
- @Test
- public void testBatchAndNonBatchCumulativeAcks() throws Exception {
+ @Test(dataProvider = "containerBuilder")
+ public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) throws Exception {
int numMsgs = 50;
int numMsgsInBatch = numMsgs / 10;
- final String topicName = "persistent://prop/ns-abc/testBatchAndNonBatchCumulativeAcks";
+ final String topicName = "persistent://prop/ns-abc/testBatchAndNonBatchCumulativeAcks-" + UUID.randomUUID();
final String subscriptionName = "bnb-sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -532,6 +559,7 @@ public class BatchMessageTest extends BrokerTestBase {
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxMessages(numMsgsInBatch)
.enableBatching(true)
+ .batcherBuilder(builder)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
// create producer to publish non batch messages
@@ -590,10 +618,10 @@ public class BatchMessageTest extends BrokerTestBase {
*
* @throws Exception
*/
- @Test(timeOut = 3000)
- public void testConcurrentBatchMessageAck() throws Exception {
+ @Test(dataProvider = "containerBuilder", timeOut = 3000)
+ public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Exception {
int numMsgs = 10;
- final String topicName = "persistent://prop/ns-abc/testConcurrentAck";
+ final String topicName = "persistent://prop/ns-abc/testConcurrentAck-" + UUID.randomUUID();
final String subscriptionName = "sub-1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -602,6 +630,7 @@ public class BatchMessageTest extends BrokerTestBase {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgs).enableBatching(true)
+ .batcherBuilder(builder)
.create();
List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
@@ -644,5 +673,69 @@ public class BatchMessageTest extends BrokerTestBase {
producer.close();
}
+ @Test
+ public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientException, ExecutionException, InterruptedException {
+ final String topicName = "persistent://prop/ns-abc/testKeyBased";
+ final String subscriptionName = "sub-1";
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+ .batchingMaxMessages(30)
+ .enableBatching(true)
+ .batcherBuilder(BatcherBuilder.KEY_BASED)
+ .create();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList();
+ String[] keys = new String[]{"key-1", "key-2", "key-3"};
+ for (int i = 0; i < 10; i++) {
+ byte[] message = ("my-message-" + i).getBytes();
+ for (String key : keys) {
+ sendFutureList.add(producer.newMessage().key(key).value(message).sendAsync());
+ }
+ }
+ FutureUtil.waitForAll(sendFutureList).get();
+
+ for (int i = 0; i < 30; i++) {
+ Message<byte[]> received = consumer.receive();
+ if (i < 10) {
+ assertEquals(received.getKey(), "key-1");
+ } else if (i < 20) {
+ assertEquals(received.getKey(), "key-2");
+ } else {
+ assertEquals(received.getKey(), "key-3");
+ }
+ consumer.acknowledge(received);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ byte[] message = ("my-message-" + i).getBytes();
+ for (String key : keys) {
+ sendFutureList.add(producer.newMessage()
+ .key(UUID.randomUUID().toString())
+ .orderingKey(key.getBytes())
+ .value(message)
+ .sendAsync());
+ }
+ }
+ FutureUtil.waitForAll(sendFutureList).get();
+
+ for (int i = 0; i < 30; i++) {
+ Message<byte[]> received = consumer.receive();
+ if (i < 10) {
+ assertEquals(new String(received.getOrderingKey()), "key-1");
+ } else if (i < 20) {
+ assertEquals(new String(received.getOrderingKey()), "key-2");
+ } else {
+ assertEquals(new String(received.getOrderingKey()), "key-3");
+ }
+ consumer.acknowledge(received);
+ }
+
+ consumer.close();
+ producer.close();
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index a6f357e..494a8c8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
@@ -37,6 +38,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertTrue;
@@ -46,6 +48,13 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
+ @DataProvider(name = "batch")
+ public Object[][] batchProvider() {
+ return new Object[][] {
+ { false },
+ { true }
+ };
+ }
@BeforeMethod
@Override
@@ -60,40 +69,22 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
super.internalCleanup();
}
- @Test
- public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+ @Test(dataProvider = "batch")
+ public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
- String topic = "persistent://public/default/key_shared";
+ String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
@Cleanup
- Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer1 = createConsumer(topic);
@Cleanup
- Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer2 = createConsumer(topic);
@Cleanup
- Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer3 = createConsumer(topic);
@Cleanup
- Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
- .topic(topic)
- .enableBatching(false)
- .create();
+ Producer<Integer> producer = createProducer(topic, enableBatch);
int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
@@ -129,41 +120,23 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
receiveAndCheck(checkList);
}
- @Test
- public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException, InterruptedException {
+ @Test(dataProvider = "batch")
+ public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException, InterruptedException {
this.conf.setSubscriptionKeySharedEnable(true);
- String topic = "persistent://public/default/key_shared_consumer_crash";
+ String topic = "persistent://public/default/key_shared_consumer_crash-" + UUID.randomUUID();
@Cleanup
- Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer1 = createConsumer(topic);
@Cleanup
- Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer2 = createConsumer(topic);
@Cleanup
- Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer3 = createConsumer(topic);
@Cleanup
- Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
- .topic(topic)
- .enableBatching(false)
- .create();
+ Producer<Integer> producer = createProducer(topic, enableBatch);
int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
@@ -219,45 +192,27 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
}
- @Test
- public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+ @Test(dataProvider = "batch")
+ public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
- String topic = "persistent://public/default/key_shared_none_key";
+ String topic = "persistent://public/default/key_shared_none_key-" + UUID.randomUUID();
@Cleanup
- Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer1 = createConsumer(topic);
@Cleanup
- Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer2 = createConsumer(topic);
@Cleanup
- Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer3 = createConsumer(topic);
+
+ @Cleanup
+ Producer<Integer> producer = createProducer(topic, enableBatch);
int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
int consumer3Slot = consumer2Slot >> 1;
- @Cleanup
- Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
- .topic(topic)
- .enableBatching(false)
- .create();
-
for (int i = 0; i < 100; i++) {
producer.newMessage()
.value(i)
@@ -276,40 +231,22 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
receiveAndCheck(checkList);
}
- @Test
- public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+ @Test(dataProvider = "batch")
+ public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
- String topic = "persistent://public/default/key_shared_ordering_key";
+ String topic = "persistent://public/default/key_shared_ordering_key-" + UUID.randomUUID();
@Cleanup
- Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer1 = createConsumer(topic);
@Cleanup
- Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer2 = createConsumer(topic);
@Cleanup
- Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(3, TimeUnit.SECONDS)
- .subscribe();
+ Consumer<Integer> consumer3 = createConsumer(topic);
@Cleanup
- Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
- .topic(topic)
- .enableBatching(false)
- .create();
+ Producer<Integer> producer = createProducer(topic, enableBatch);
int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
@@ -358,6 +295,32 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
.subscribe();
}
+ private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
+ Producer<Integer> producer = null;
+ if (enableBatch) {
+ producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(true)
+ .batcherBuilder(BatcherBuilder.KEY_BASED)
+ .create();
+ } else {
+ producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+ }
+ return producer;
+ }
+
+ private Consumer<Integer> createConsumer(String topic) throws PulsarClientException {
+ return pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(3, TimeUnit.SECONDS)
+ .subscribe();
+ }
+
private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchMessageContainer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchMessageContainer.java
new file mode 100644
index 0000000..cf72962
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchMessageContainer.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Batch message container for individual messages being published until they are batched and sent to broker
+ */
+public interface BatchMessageContainer {
+
+ /**
+ * Clear the message batch container.
+ */
+ void clear();
+
+ /**
+ * Check the message batch container is empty.
+ *
+ * @return return true if empty, otherwise return false.
+ */
+ boolean isEmpty();
+
+ /**
+ * Get count of messages in the message batch container.
+ *
+ * @return messages count
+ */
+ int getNumMessagesInBatch();
+
+ /**
+ * Get current message batch size of the message batch container in bytes.
+ *
+ * @return message batch size in bytes
+ */
+ long getCurrentBatchSize();
+
+ /**
+ * Release the payload and clear the container.
+ *
+ * @param ex cause
+ */
+ void discard(Exception ex);
+
+ /**
+ * Return the batch container batch message in multiple batches
+ * @return
+ */
+ boolean isMultiBatches();
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatcherBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatcherBuilder.java
new file mode 100644
index 0000000..705a0ac
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatcherBuilder.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.client.internal.DefaultImplementation;
+
+/**
+ * Batcher builder
+ */
+public interface BatcherBuilder {
+
+ /**
+ * Default batch message container
+ *
+ * incoming single messages:
+ * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+ *
+ * batched into single batch message:
+ * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
+ */
+ BatcherBuilder DEFAULT = DefaultImplementation.newDefaultBatcherBuilder();
+
+ /**
+ * Key based batch message container
+ *
+ * incoming single messages:
+ * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+ *
+ * batched into multiple batch messages:
+ * [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
+ */
+ BatcherBuilder KEY_BASED = DefaultImplementation.newKeyBasedBatcherBuilder();
+
+ /**
+ * Build a new batch message container.
+ * @return new batch message container
+ */
+ BatchMessageContainer build();
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index a98036c..2e1a858 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -22,8 +22,6 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import javax.swing.plaf.basic.BasicInternalFrameTitlePane.MaximizeAction;
-
import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
/**
@@ -366,6 +364,16 @@ public interface ProducerBuilder<T> extends Cloneable {
ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch);
/**
+ * Set the batcher builder {@link BatcherBuilder} of the producer. Producer will use the batcher builder to
+ * build a batch message container.This is only be used when batching is enabled
+ *
+ * @param batcherBuilder
+ * batcher builder
+ * @return the producer builder instance
+ */
+ ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder);
+
+ /**
* Set the baseline for the sequence ids for messages published by the producer.
* <p>
* First message will be using {@code (initialSequenceId + 1)} as its sequence id and subsequent messages will be assigned
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index d85b278..80282bf 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -36,6 +36,7 @@ import java.util.function.Supplier;
import lombok.experimental.UtilityClass;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
@@ -282,4 +283,16 @@ public class DefaultImplementation {
() -> (RecordSchemaBuilder) getConstructor("org.apache.pulsar.client.impl.schema.RecordSchemaBuilderImpl",
String.class).newInstance(name));
}
+
+ public static BatcherBuilder newDefaultBatcherBuilder() {
+ return catchExceptions(
+ () -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.DefaultBatcherBuilder")
+ .newInstance());
+ }
+
+ public static BatcherBuilder newKeyBasedBatcherBuilder() {
+ return catchExceptions(
+ () -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.KeyBasedBatcherBuilder")
+ .newInstance());
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
new file mode 100644
index 0000000..3d6ca4a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Batch message container framework.
+ */
+public abstract class AbstractBatchMessageContainer implements BatchMessageContainerBase {
+
+ protected PulsarApi.CompressionType compressionType;
+ protected CompressionCodec compressor;
+ protected String topicName;
+ protected String producerName;
+ protected ProducerImpl producer;
+
+ protected int maxNumMessagesInBatch;
+ protected int numMessagesInBatch = 0;
+ protected long currentBatchSizeBytes = 0;
+
+ protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
+ protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;
+
+ // This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
+ // allocate a new buffer that can hold the entire batch without needing costly reallocations
+ protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
+
+ @Override
+ public boolean haveEnoughSpace(MessageImpl<?> msg) {
+ int messageSize = msg.getDataBuffer().readableBytes();
+ return ((messageSize + currentBatchSizeBytes) <= MAX_MESSAGE_BATCH_SIZE_BYTES
+ && numMessagesInBatch < maxNumMessagesInBatch);
+ }
+
+ @Override
+ public int getNumMessagesInBatch() {
+ return numMessagesInBatch;
+ }
+
+ @Override
+ public long getCurrentBatchSize() {
+ return currentBatchSizeBytes;
+ }
+
+ @Override
+ public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProducerImpl.OpSendMsg createOpSendMsg() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setProducer(ProducerImpl<?> producer) {
+ this.producer = producer;
+ this.topicName = producer.getTopic();
+ this.producerName = producer.getProducerName();
+ this.compressionType = CompressionCodecProvider
+ .convertToWireProtocol(producer.getConfiguration().getCompressionType());
+ this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType);
+ this.maxNumMessagesInBatch = producer.getConfiguration().getBatchingMaxMessages();
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
new file mode 100644
index 0000000..5f930bd
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.BatchMessageContainer;
+import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface BatchMessageContainerBase extends BatchMessageContainer {
+
+ /**
+ * Add message to the batch message container.
+ *
+ * @param msg message will add to the batch message container
+ * @param callback message send callback
+ */
+ void add(MessageImpl<?> msg, SendCallback callback);
+
+ /**
+ * Check the batch message container have enough space for the message want to add.
+ *
+ * @param msg the message want to add
+ * @return return true if the container have enough space for the specific message,
+ * otherwise return false.
+ */
+ boolean haveEnoughSpace(MessageImpl<?> msg);
+
+ /**
+ * Set producer of the message batch container.
+ *
+ * @param producer producer
+ */
+ void setProducer(ProducerImpl<?> producer);
+
+ /**
+ * Create list of OpSendMsg, producer use OpSendMsg to send to the broker.
+ *
+ * @return list of OpSendMsg
+ * @throws IOException
+ */
+ List<OpSendMsg> createOpSendMsgs() throws IOException;
+
+ /**
+ * Create OpSendMsg, producer use OpSendMsg to send to the broker.
+ *
+ * @return OpSendMsg
+ * @throws IOException
+ */
+ OpSendMsg createOpSendMsg() throws IOException;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
similarity index 52%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 0e1adee..b293572 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -22,63 +22,42 @@ import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
+import java.io.IOException;
import java.util.List;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.protocol.Commands;
+
import org.apache.pulsar.common.api.proto.PulsarApi;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * container for individual messages being published until they are batched and sent to broker
+ * Default batch message container
+ *
+ * incoming single messages:
+ * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+ *
+ * batched into single batch message:
+ * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
*/
+class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
-class BatchMessageContainer {
-
- private SendCallback previousCallback = null;
- private final PulsarApi.CompressionType compressionType;
- private final CompressionCodec compressor;
- private final String topicName;
- private final String producerName;
-
- final int maxNumMessagesInBatch;
-
- PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
- int numMessagesInBatch = 0;
- long currentBatchSizeBytes = 0;
+ private PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
// sequence id for this batch which will be persisted as a single entry by broker
- long sequenceId = -1;
- ByteBuf batchedMessageMetadataAndPayload;
- List<MessageImpl<?>> messages = Lists.newArrayList();
+ private long sequenceId = -1;
+ private ByteBuf batchedMessageMetadataAndPayload;
+ private List<MessageImpl<?>> messages = Lists.newArrayList();
+ protected SendCallback previousCallback = null;
// keep track of callbacks for individual messages being published in a batch
- SendCallback firstCallback;
-
- private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
- protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;
-
- // This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
- // allocate a new buffer that can hold the entire batch without needing costly reallocations
- private int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
-
- BatchMessageContainer(int maxNumMessagesInBatch, PulsarApi.CompressionType compressionType, String topicName,
- String producerName) {
- this.maxNumMessagesInBatch = maxNumMessagesInBatch;
- this.compressionType = compressionType;
- this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType);
- this.topicName = topicName;
- this.producerName = producerName;
- }
-
- boolean hasSpaceInBatch(MessageImpl<?> msg) {
- int messageSize = msg.getDataBuffer().readableBytes();
- return ((messageSize + currentBatchSizeBytes) <= MAX_MESSAGE_BATCH_SIZE_BYTES
- && numMessagesInBatch < maxNumMessagesInBatch);
- }
+ protected SendCallback firstCallback;
- void add(MessageImpl<?> msg, SendCallback callback) {
+ @Override
+ public void add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName,
@@ -98,16 +77,17 @@ class BatchMessageContainer {
previousCallback.addCallback(msg, callback);
}
previousCallback = callback;
-
currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
- PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder();
- batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
- msg.getDataBuffer(), batchedMessageMetadataAndPayload);
messages.add(msg);
- msgBuilder.recycle();
}
- ByteBuf getCompressedBatchMetadataAndPayload() {
+ private ByteBuf getCompressedBatchMetadataAndPayload() {
+ for (MessageImpl<?> msg : messages) {
+ PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder();
+ batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
+ msg.getDataBuffer(), batchedMessageMetadataAndPayload);
+ msgBuilder.recycle();
+ }
int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload.release();
@@ -122,20 +102,8 @@ class BatchMessageContainer {
return compressedPayload;
}
- PulsarApi.MessageMetadata setBatchAndBuild() {
- messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] num messages in batch being closed are {}", topicName, producerName,
- numMessagesInBatch);
- }
- return messageMetadata.build();
- }
-
- ByteBuf getBatchedSingleMessageMetadataAndPayload() {
- return batchedMessageMetadataAndPayload;
- }
-
- void clear() {
+ @Override
+ public void clear() {
messages = Lists.newArrayList();
firstCallback = null;
previousCallback = null;
@@ -146,9 +114,52 @@ class BatchMessageContainer {
batchedMessageMetadataAndPayload = null;
}
- boolean isEmpty() {
+ @Override
+ public boolean isEmpty() {
return messages.isEmpty();
}
- private static final Logger log = LoggerFactory.getLogger(BatchMessageContainer.class);
+ @Override
+ public void discard(Exception ex) {
+ try {
+ // Need to protect ourselves from any exception being thrown in the future handler from the application
+ firstCallback.sendComplete(ex);
+ } catch (Throwable t) {
+ log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
+ sequenceId, t);
+ }
+ ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
+ clear();
+ }
+
+ @Override
+ public boolean isMultiBatches() {
+ return false;
+ }
+
+ @Override
+ public OpSendMsg createOpSendMsg() throws IOException {
+ ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
+ messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+ ByteBufPair cmd = producer.sendMessage(producer.producerId, sequenceId, numMessagesInBatch,
+ messageMetadata.build(), encryptedPayload);
+
+ OpSendMsg op = OpSendMsg.create(messages, cmd, sequenceId, firstCallback);
+
+ if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+ cmd.release();
+ if (op != null) {
+ op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
+ "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
+ op.recycle();
+ }
+ return null;
+ }
+
+ op.setNumMessagesInBatch(numMessagesInBatch);
+ op.setBatchSizeByte(currentBatchSizeBytes);
+ return op;
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(BatchMessageContainerImpl.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
new file mode 100644
index 0000000..a43afda
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.protocol.ByteBufPair;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Key based batch message container
+ *
+ * incoming single messages:
+ * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+ *
+ * batched into multiple batch messages:
+ * [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
+ */
+class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
+
+ private Map<String, KeyedBatch> batches = new HashMap<>();
+
+ @Override
+ public void add(MessageImpl<?> msg, SendCallback callback) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName,
+ numMessagesInBatch);
+ }
+ numMessagesInBatch++;
+ currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
+ String key = getKey(msg);
+ KeyedBatch part = batches.get(key);
+ if (part == null) {
+ part = new KeyedBatch();
+ part.addMsg(msg, callback);
+ part.compressionType = compressionType;
+ part.compressor = compressor;
+ part.maxBatchSize = maxBatchSize;
+ batches.putIfAbsent(key, part);
+ } else {
+ part.addMsg(msg, callback);
+ }
+ }
+
+ @Override
+ public void clear() {
+ numMessagesInBatch = 0;
+ currentBatchSizeBytes = 0;
+ batches = new HashMap<>();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return batches.isEmpty();
+ }
+
+ @Override
+ public void discard(Exception ex) {
+ try {
+ // Need to protect ourselves from any exception being thrown in the future handler from the application
+ batches.forEach((k, v) -> v.firstCallback.sendComplete(ex));
+ } catch (Throwable t) {
+ log.warn("[{}] [{}] Got exception while completing the callback", topicName, producerName, t);
+ }
+ batches.forEach((k, v) -> ReferenceCountUtil.safeRelease(v.batchedMessageMetadataAndPayload));
+ clear();
+ }
+
+ @Override
+ public boolean isMultiBatches() {
+ return true;
+ }
+
+ private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException {
+ ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata, keyedBatch.getCompressedBatchMetadataAndPayload());
+ final int numMessagesInBatch = keyedBatch.messages.size();
+ long currentBatchSizeBytes = 0;
+ for (MessageImpl<?> message : keyedBatch.messages) {
+ currentBatchSizeBytes += message.getDataBuffer().readableBytes();
+ }
+ keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+ ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch,
+ keyedBatch.messageMetadata.build(), encryptedPayload);
+
+ ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback);
+
+ if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+ cmd.release();
+ if (op != null) {
+ op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
+ "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
+ op.recycle();
+ }
+ return null;
+ }
+ op.setNumMessagesInBatch(numMessagesInBatch);
+ op.setBatchSizeByte(currentBatchSizeBytes);
+ return op;
+ }
+
+ @Override
+ public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
+ List<ProducerImpl.OpSendMsg> result = new ArrayList<>();
+ List<KeyedBatch> list = new ArrayList<>(batches.values());
+ list.sort(((o1, o2) -> ComparisonChain.start()
+ .compare(o1.sequenceId, o2.sequenceId)
+ .result()));
+ for (KeyedBatch keyedBatch : list) {
+ ProducerImpl.OpSendMsg op = createOpSendMsg(keyedBatch);
+ if (op != null) {
+ result.add(op);
+ }
+ }
+ return result;
+ }
+
+ private String getKey(MessageImpl<?> msg) {
+ if (msg.hasOrderingKey()) {
+ return Base64.getEncoder().encodeToString(msg.getOrderingKey());
+ }
+ return msg.getKey();
+ }
+
+ private static class KeyedBatch {
+ private PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
+ // sequence id for this batch which will be persisted as a single entry by broker
+ private long sequenceId = -1;
+ private ByteBuf batchedMessageMetadataAndPayload;
+ private List<MessageImpl<?>> messages = Lists.newArrayList();
+ private SendCallback previousCallback = null;
+ private PulsarApi.CompressionType compressionType;
+ private CompressionCodec compressor;
+ private int maxBatchSize;
+
+ // keep track of callbacks for individual messages being published in a batch
+ private SendCallback firstCallback;
+
+ private ByteBuf getCompressedBatchMetadataAndPayload() {
+ for (MessageImpl<?> msg : messages) {
+ PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder();
+ batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder,
+ msg.getDataBuffer(), batchedMessageMetadataAndPayload);
+ msgBuilder.recycle();
+ }
+ int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
+ ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
+ batchedMessageMetadataAndPayload.release();
+ if (compressionType != PulsarApi.CompressionType.NONE) {
+ messageMetadata.setCompression(compressionType);
+ messageMetadata.setUncompressedSize(uncompressedSize);
+ }
+
+ // Update the current max batch size using the uncompressed size, which is what we need in any case to
+ // accumulate the batch content
+ maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
+ return compressedPayload;
+ }
+
+ private void addMsg(MessageImpl<?> msg, SendCallback callback) {
+ if (messages.size() == 0) {
+ sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
+ if (msg.hasKey()) {
+ messageMetadata.setPartitionKey(msg.getKey());
+ if (msg.hasBase64EncodedKey()) {
+ messageMetadata.setPartitionKeyB64Encoded(true);
+ }
+ }
+ if (msg.hasOrderingKey()) {
+ messageMetadata.setOrderingKey(ByteString.copyFrom(msg.getOrderingKey()));
+ }
+ batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
+ .buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES));
+ firstCallback = callback;
+ }
+ if (previousCallback != null) {
+ previousCallback.addCallback(msg, callback);
+ }
+ previousCallback = callback;
+ messages.add(msg);
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(BatchMessageKeyBasedContainer.class);
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultBatcherBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultBatcherBuilder.java
new file mode 100644
index 0000000..17c2eb4
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultBatcherBuilder.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.BatchMessageContainer;
+import org.apache.pulsar.client.api.BatcherBuilder;
+
+public class DefaultBatcherBuilder implements BatcherBuilder {
+
+ @Override
+ public BatchMessageContainer build() {
+ return new BatchMessageContainerImpl();
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/KeyBasedBatcherBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/KeyBasedBatcherBuilder.java
new file mode 100644
index 0000000..8a4e09e
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/KeyBasedBatcherBuilder.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.BatchMessageContainer;
+import org.apache.pulsar.client.api.BatcherBuilder;
+
+public class KeyBasedBatcherBuilder implements BatcherBuilder {
+
+ @Override
+ public BatchMessageContainer build() {
+ return new BatchMessageKeyBasedContainer();
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 888b58e..77f4dbe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
@@ -215,6 +216,13 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
}
@Override
+ public ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder) {
+ conf.setBatcherBuilder(batcherBuilder);
+ return this;
+ }
+
+
+ @Override
public ProducerBuilder<T> initialSequenceId(long initialSequenceId) {
conf.setInitialSequenceId(initialSequenceId);
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index ab6b5b6..48656f8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -75,7 +76,7 @@ import org.slf4j.LoggerFactory;
public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, ConnectionHandler.Connection {
// Producer id, used to identify a producer within a single connection
- private final long producerId;
+ protected final long producerId;
// Variable is used through the atomic updater
private volatile long msgIdGenerator;
@@ -87,7 +88,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private volatile Timeout batchMessageAndSendTimeout = null;
private long createProducerTimeout;
private final int maxNumMessagesInBatch;
- private final BatchMessageContainer batchMessageContainer;
+ private final BatchMessageContainerBase batchMessageContainer;
private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
// Globally unique producer name
@@ -163,8 +164,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
if (conf.isBatchingEnabled()) {
this.maxNumMessagesInBatch = conf.getBatchingMaxMessages();
- this.batchMessageContainer = new BatchMessageContainer(maxNumMessagesInBatch,
- CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()), topic, producerName);
+ BatcherBuilder containerBuilder = conf.getBatcherBuilder();
+ if (containerBuilder == null) {
+ containerBuilder = BatcherBuilder.DEFAULT;
+ }
+ this.batchMessageContainer = (BatchMessageContainerBase)containerBuilder.build();
+ this.batchMessageContainer.setProducer(this);
} else {
this.maxNumMessagesInBatch = 1;
this.batchMessageContainer = null;
@@ -363,12 +368,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (isBatchMessagingEnabled() && !msgMetadataBuilder.hasDeliverAtTime()) {
// handle boundary cases where message being added would exceed
// batch size and/or max message size
- if (batchMessageContainer.hasSpaceInBatch(msg)) {
+ if (batchMessageContainer.haveEnoughSpace(msg)) {
batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
payload.release();
- if (batchMessageContainer.numMessagesInBatch == maxNumMessagesInBatch
- || batchMessageContainer.currentBatchSizeBytes >= BatchMessageContainer.MAX_MESSAGE_BATCH_SIZE_BYTES) {
+ if (batchMessageContainer.getNumMessagesInBatch() == maxNumMessagesInBatch
+ || batchMessageContainer.getCurrentBatchSize() >= BatchMessageContainerImpl.MAX_MESSAGE_BATCH_SIZE_BYTES) {
batchMessageAndSend();
}
} else {
@@ -425,7 +430,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
}
- private ByteBuf encryptMessage(MessageMetadata.Builder msgMetadata, ByteBuf compressedPayload)
+ protected ByteBuf encryptMessage(MessageMetadata.Builder msgMetadata, ByteBuf compressedPayload)
throws PulsarClientException {
ByteBuf encryptedPayload = compressedPayload;
@@ -447,7 +452,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return encryptedPayload;
}
- private ByteBufPair sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata,
+ protected ByteBufPair sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata,
ByteBuf compressedPayload) throws IOException {
ChecksumType checksumType;
@@ -1224,17 +1229,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (batchMessageContainer.isEmpty()) {
return;
}
- int numMessagesInBatch = batchMessageContainer.numMessagesInBatch;
+ int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
semaphore.release(numMessagesInBatch);
- try {
- // Need to protect ourselves from any exception being thrown in the future handler from the application
- batchMessageContainer.firstCallback.sendComplete(ex);
- } catch (Throwable t) {
- log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
- batchMessageContainer.sequenceId, t);
- }
- ReferenceCountUtil.safeRelease(batchMessageContainer.getBatchedSingleMessageMetadataAndPayload());
- batchMessageContainer.clear();
+ batchMessageContainer.discard(ex);
}
TimerTask batchMessageAndSendTask = new TimerTask() {
@@ -1288,68 +1285,56 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private void batchMessageAndSend() {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Batching the messages from the batch container with {} messages", topic, producerName,
- batchMessageContainer.numMessagesInBatch);
+ batchMessageContainer.getNumMessagesInBatch());
}
- OpSendMsg op = null;
- int numMessagesInBatch = 0;
- try {
- if (!batchMessageContainer.isEmpty()) {
- numMessagesInBatch = batchMessageContainer.numMessagesInBatch;
- ByteBuf compressedPayload = batchMessageContainer.getCompressedBatchMetadataAndPayload();
- long sequenceId = batchMessageContainer.sequenceId;
- ByteBuf encryptedPayload = encryptMessage(batchMessageContainer.messageMetadata, compressedPayload);
-
- ByteBufPair cmd = sendMessage(producerId, sequenceId, batchMessageContainer.numMessagesInBatch,
- batchMessageContainer.setBatchAndBuild(), encryptedPayload);
-
- op = OpSendMsg.create(batchMessageContainer.messages, cmd, sequenceId,
- batchMessageContainer.firstCallback);
-
- if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
- cmd.release();
- semaphore.release(numMessagesInBatch);
- if (op != null) {
- op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
- "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
+ if (!batchMessageContainer.isEmpty()) {
+ try {
+ if (batchMessageContainer.isMultiBatches()) {
+ List<OpSendMsg> opSendMsgs = batchMessageContainer.createOpSendMsgs();
+ for (OpSendMsg opSendMsg : opSendMsgs) {
+ processOpSendMsg(opSendMsg);
}
- return;
- }
-
- op.setNumMessagesInBatch(batchMessageContainer.numMessagesInBatch);
- op.setBatchSizeByte(batchMessageContainer.currentBatchSizeBytes);
-
- batchMessageContainer.clear();
-
- pendingMessages.put(op);
-
- ClientCnx cnx = cnx();
- if (isConnected()) {
- // If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
- // connection is established
- cmd.retain();
- cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
- stats.updateNumMsgsSent(numMessagesInBatch, op.batchSizeByte);
} else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", topic, producerName,
- sequenceId);
+ OpSendMsg opSendMsg = batchMessageContainer.createOpSendMsg();
+ if (opSendMsg != null) {
+ processOpSendMsg(opSendMsg);
}
}
+ } catch (PulsarClientException e) {
+ Thread.currentThread().interrupt();
+ semaphore.release(batchMessageContainer.getNumMessagesInBatch());
+ } catch (Throwable t) {
+ semaphore.release(batchMessageContainer.getNumMessagesInBatch());
+ log.warn("[{}] [{}] error while create opSendMsg by batch message container -- {}", topic, producerName, t);
+ }
+ }
+ }
+
+ private void processOpSendMsg(OpSendMsg op) {
+ try {
+ batchMessageContainer.clear();
+ pendingMessages.put(op);
+ ClientCnx cnx = cnx();
+ if (isConnected()) {
+ // If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
+ // connection is established
+ op.cmd.retain();
+ cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
+ stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", topic, producerName,
+ op.sequenceId);
+ }
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- semaphore.release(numMessagesInBatch);
+ semaphore.release(op.numMessagesInBatch);
if (op != null) {
op.callback.sendComplete(new PulsarClientException(ie));
}
- } catch (PulsarClientException e) {
- Thread.currentThread().interrupt();
- semaphore.release(numMessagesInBatch);
- if (op != null) {
- op.callback.sendComplete(e);
- }
} catch (Throwable t) {
- semaphore.release(numMessagesInBatch);
+ semaphore.release(op.numMessagesInBatch);
log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t);
if (op != null) {
op.callback.sendComplete(new PulsarClientException(t));
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 7ec8f90..5720420 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -26,8 +26,8 @@ import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.NoArgsConstructor;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
@@ -66,6 +66,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
private int batchingMaxMessages = 1000;
private boolean batchingEnabled = true; // enabled by default
+ private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
@JsonIgnore
private CryptoKeyReader cryptoKeyReader;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 626c501..bb0092f 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -59,6 +59,7 @@ public class ConfigurationDataUtilsTest {
Map<String, Object> config = new HashMap<>();
config.put("producerName", "test-producer");
config.put("batchingEnabled", false);
+ confData.setBatcherBuilder(null);
confData = ConfigurationDataUtils.loadData(config, confData, ProducerConfigurationData.class);
assertEquals("test-producer", confData.getProducerName());
assertEquals(false, confData.isBatchingEnabled());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 1de08cf..ba801d9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1158,6 +1158,9 @@ public class Commands {
singleMessageMetadataBuilder = singleMessageMetadataBuilder.setPartitionKey(msgBuilder.getPartitionKey())
.setPartitionKeyB64Encoded(msgBuilder.getPartitionKeyB64Encoded());
}
+ if (msgBuilder.hasOrderingKey()) {
+ singleMessageMetadataBuilder = singleMessageMetadataBuilder.setOrderingKey(msgBuilder.getOrderingKey());
+ }
if (!msgBuilder.getPropertiesList().isEmpty()) {
singleMessageMetadataBuilder = singleMessageMetadataBuilder
.addAllProperties(msgBuilder.getPropertiesList());
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index 8432b1c..d331ca2 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -91,6 +91,7 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
this.producerConf = producerConf;
this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
this.producerConf.setTopicName(pulsarBoltConf.getTopic());
+ this.producerConf.setBatcherBuilder(null);
}
@SuppressWarnings({ "rawtypes" })