You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/05/24 09:22:05 UTC
[pulsar] 02/02: [Java Client] Fix wrong behavior of deduplication for key based batching (#15413)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dc861c46b5b57693f0f4e132a70ba9153096775e
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri May 13 20:18:42 2022 +0800
[Java Client] Fix wrong behavior of deduplication for key based batching (#15413)
### Motivation
Currently message deduplication doesn't work well for key based
batching. First, the key based batch container doesn't update the
`lastSequenceIdPushed`. So a batch could contain both duplicated and not
duplicated messages. Second, when `createOpSendMsgs` is called, the
`OpSendMsg` objects are sorted by the lowest sequence ids, and the
highest sequence id is not set. If a batch contains sequence id 0,1,2,
then the message with sequence id 1 or 2 won't be dropped.
### Modifications
- Refactor the key based batch container that the
`BatchMessageContainerImpl` is reused instead of maintaining a
`KeyedBatch` class.
- When `createOpSendMsgs` is called, clear the highest sequence id field
and configure the sequence id field with the highest sequence id to fix
the second issue described before.
- Add `testKeyBasedBatchingOrder` to show and verify the current
behavior.
- Add test for key based batching into
`testProducerDeduplicationWithDiscontinuousSequenceId` to verify
`lastSlastSequenceIdPushed` is updated correctly.
(cherry picked from commit a77333705ffb352da39767d53975353bb4f8864e)
---
.../pulsar/client/api/ClientDeduplicationTest.java | 98 +++++++++-
.../client/impl/BatchMessageContainerImpl.java | 28 ++-
.../client/impl/BatchMessageKeyBasedContainer.java | 211 +++++----------------
3 files changed, 155 insertions(+), 182 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 304bb6eaaa0..52017444a2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -20,19 +20,37 @@ package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+@Slf4j
@Test(groups = "flaky")
public class ClientDeduplicationTest extends ProducerConsumerBase {
+ @DataProvider
+ public static Object[][] batchingTypes() {
+ return new Object[][] {
+ { BatcherBuilder.DEFAULT },
+ { BatcherBuilder.KEY_BASED }
+ };
+ }
+
@BeforeClass
@Override
protected void setup() throws Exception {
@@ -46,7 +64,7 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
super.internalCleanup();
}
- @Test
+ @Test(priority = -1)
public void testNamespaceDeduplicationApi() throws Exception {
final String namespace = "my-property/my-ns";
assertNull(admin.namespaces().getDeduplicationStatus(namespace));
@@ -174,9 +192,10 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
producer.close();
}
- @Test(timeOut = 30000)
- public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Exception {
- String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId";
+ @Test(timeOut = 30000, dataProvider = "batchingTypes")
+ public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder batcherBuilder) throws Exception {
+ String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-"
+ + System.currentTimeMillis();
admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
// Set infinite timeout
@@ -185,7 +204,9 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
.topic(topic)
.producerName("my-producer-name")
.enableBatching(true)
+ .batcherBuilder(batcherBuilder)
.batchingMaxMessages(10)
+ .batchingMaxPublishDelay(1L, TimeUnit.HOURS)
.sendTimeout(0, TimeUnit.SECONDS);
Producer<byte[]> producer = producerBuilder.create();
@@ -208,7 +229,8 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
producer.flush();
for (int i = 0; i < 4; i++) {
- Message<byte[]> msg = consumer.receive();
+ Message<byte[]> msg = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(msg);
assertEquals(new String(msg.getData()), "my-message-" + i);
consumer.acknowledge(msg);
}
@@ -284,4 +306,68 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
producer.close();
}
+
+ @Test(timeOut = 30000)
+ public void testKeyBasedBatchingOrder() throws Exception {
+ final String topic = "persistent://my-property/my-ns/test-key-based-batching-order";
+ admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+
+ final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribe();
+ final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .batcherBuilder(BatcherBuilder.KEY_BASED)
+ .batchingMaxMessages(100)
+ .batchingMaxBytes(1024 * 1024 * 5)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .create();
+ // | key | sequence id list |
+ // | :-- | :--------------- |
+ // | A | 0, 3, 4 |
+ // | B | 1, 2 |
+ final List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+ sendFutures.add(producer.newMessage().key("A").value("msg-0").sequenceId(0L).sendAsync());
+ sendFutures.add(producer.newMessage().key("B").value("msg-1").sequenceId(1L).sendAsync());
+ sendFutures.add(producer.newMessage().key("B").value("msg-2").sequenceId(2L).sendAsync());
+ sendFutures.add(producer.newMessage().key("A").value("msg-3").sequenceId(3L).sendAsync());
+ sendFutures.add(producer.newMessage().key("A").value("msg-4").sequenceId(4L).sendAsync());
+ // The message order is expected to be [1, 2, 0, 3, 4]. The sequence ids are not ordered strictly, but:
+ // 1. The sequence ids for a given key are ordered.
+ // 2. The highest sequence ids of batches are ordered.
+ producer.flush();
+
+ FutureUtil.waitForAll(sendFutures);
+ final List<MessageId> sendMessageIds = sendFutures.stream().map(CompletableFuture::join)
+ .collect(Collectors.toList());
+ for (int i = 0; i < sendMessageIds.size(); i++) {
+ log.info("Send msg-{} to {}", i, sendMessageIds.get(i));
+ }
+
+ final List<Long> sequenceIdList = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ log.info("Received {}, key: {}, seq id: {}, msg id: {}",
+ msg.getValue(), msg.getKey(), msg.getSequenceId(), msg.getMessageId());
+ assertNotNull(msg);
+ sequenceIdList.add(msg.getSequenceId());
+ }
+ assertEquals(sequenceIdList, Arrays.asList(1L, 2L, 0L, 3L, 4L));
+
+ for (int i = 0; i < 5; i++) {
+ // Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned.
+ final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send();
+ assertTrue(messageId instanceof BatchMessageIdImpl);
+ final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId;
+ assertEquals(messageIdImpl.getLedgerId(), -1L);
+ assertEquals(messageIdImpl.getEntryId(), -1L);
+ }
+
+ consumer.close();
+ producer.close();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index cea567e8d68..996875a7131 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -19,13 +19,13 @@
package org.apache.pulsar.client.impl;
import com.google.common.collect.Lists;
-
import io.netty.buffer.ByteBuf;
-
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-
+import lombok.Getter;
+import lombok.Setter;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -49,7 +49,11 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
private MessageMetadata messageMetadata = new MessageMetadata();
// sequence id for this batch which will be persisted as a single entry by broker
+ @Getter
+ @Setter
private long lowestSequenceId = -1L;
+ @Getter
+ @Setter
private long highestSequenceId = -1L;
private ByteBuf batchedMessageMetadataAndPayload;
private List<MessageImpl<?>> messages = Lists.newArrayList();
@@ -57,6 +61,14 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
// keep track of callbacks for individual messages being published in a batch
protected SendCallback firstCallback;
+ public BatchMessageContainerImpl() {
+ }
+
+ public BatchMessageContainerImpl(ProducerImpl<?> producer) {
+ this();
+ setProducer(producer);
+ }
+
@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {
@@ -82,10 +94,6 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
}
} catch (Throwable e) {
log.error("construct first message failed, exception is ", e);
- if (batchedMessageMetadataAndPayload != null) {
- // if payload has been allocated release it
- batchedMessageMetadataAndPayload.release();
- }
discard(new PulsarClientException(e));
return false;
}
@@ -104,7 +112,6 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
}
highestSequenceId = msg.getSequenceId();
ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
-
return isBatchFull();
}
@@ -172,6 +179,10 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
if (firstCallback != null) {
firstCallback.sendComplete(ex);
}
+ if (batchedMessageMetadataAndPayload != null) {
+ ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
+ batchedMessageMetadataAndPayload = null;
+ }
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
lowestSequenceId, t);
@@ -193,6 +204,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
return null;
}
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+ messageMetadata.setSequenceId(lowestSequenceId);
messageMetadata.setHighestSequenceId(highestSequenceId);
if (currentTxnidMostBits != -1) {
messageMetadata.setTxnidMostBits(currentTxnidMostBits);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index 505ca75743c..77990eeeacb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -18,27 +18,12 @@
*/
package org.apache.pulsar.client.impl;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Lists;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.ReferenceCountUtil;
-
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.api.proto.CompressionType;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.protocol.ByteBufPair;
-import org.apache.pulsar.common.protocol.Commands;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +38,7 @@ import org.slf4j.LoggerFactory;
*/
class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
- private Map<String, KeyedBatch> batches = new HashMap<>();
+ private final Map<String, BatchMessageContainerImpl> batches = new HashMap<>();
@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {
@@ -61,29 +46,16 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName,
numMessagesInBatch);
}
- numMessagesInBatch++;
- currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
String key = getKey(msg);
- KeyedBatch part = batches.get(key);
- if (part == null) {
- part = new KeyedBatch();
- part.addMsg(msg, callback);
- part.compressionType = compressionType;
- part.compressor = compressor;
- part.maxBatchSize = maxBatchSize;
- part.topicName = topicName;
- part.producerName = producerName;
- batches.putIfAbsent(key, part);
-
- if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
- currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
- }
- if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {
- currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
- }
-
- } else {
- part.addMsg(msg, callback);
+ final BatchMessageContainerImpl batchMessageContainer = batches.computeIfAbsent(key,
+ __ -> new BatchMessageContainerImpl(producer));
+ batchMessageContainer.add(msg, callback);
+ // The `add` method fails iff the container is empty, i.e. the `msg` is the first message to add, while `msg`
+ // was failed to add. In this case, `clear` method will be called and the batch container is empty and there is
+ // no need to update the stats.
+ if (!batchMessageContainer.isEmpty()) {
+ numMessagesInBatch++;
+ currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
}
return isBatchFull();
}
@@ -92,7 +64,7 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
public void clear() {
numMessagesInBatch = 0;
currentBatchSizeBytes = 0;
- batches = new HashMap<>();
+ batches.clear();
currentTxnidMostBits = -1L;
currentTxnidLeastBits = -1L;
}
@@ -104,13 +76,7 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
@Override
public void discard(Exception ex) {
- try {
- // Need to protect ourselves from any exception being thrown in the future handler from the application
- batches.forEach((k, v) -> v.firstCallback.sendComplete(ex));
- } catch (Throwable t) {
- log.warn("[{}] [{}] Got exception while completing the callback", topicName, producerName, t);
- }
- batches.forEach((k, v) -> ReferenceCountUtil.safeRelease(v.batchedMessageMetadataAndPayload));
+ batches.forEach((k, v) -> v.discard(ex));
clear();
}
@@ -119,64 +85,45 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
return true;
}
- private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException {
- ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata, keyedBatch.getCompressedBatchMetadataAndPayload());
- if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
- keyedBatch.discard(new PulsarClientException.InvalidMessageException(
- "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
- return null;
- }
-
- final int numMessagesInBatch = keyedBatch.messages.size();
- long currentBatchSizeBytes = 0;
- for (MessageImpl<?> message : keyedBatch.messages) {
- currentBatchSizeBytes += message.getDataBuffer().readableBytes();
- }
- keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
- if (currentTxnidMostBits != -1) {
- keyedBatch.messageMetadata.setTxnidMostBits(currentTxnidMostBits);
- }
- if (currentTxnidLeastBits != -1) {
- keyedBatch.messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);
- }
- ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch,
- keyedBatch.messageMetadata, encryptedPayload);
-
- ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback);
-
- op.setNumMessagesInBatch(numMessagesInBatch);
- op.setBatchSizeByte(currentBatchSizeBytes);
- return op;
- }
-
@Override
public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
- List<ProducerImpl.OpSendMsg> result = new ArrayList<>();
- List<KeyedBatch> list = new ArrayList<>(batches.values());
- list.sort(((o1, o2) -> ComparisonChain.start()
- .compare(o1.sequenceId, o2.sequenceId)
- .result()));
- for (KeyedBatch keyedBatch : list) {
- ProducerImpl.OpSendMsg op = createOpSendMsg(keyedBatch);
- if (op != null) {
- result.add(op);
+ try {
+ // In key based batching, the sequence ids might not be ordered, for example,
+ // | key | sequence id list |
+ // | :-- | :--------------- |
+ // | A | 0, 3, 4 |
+ // | B | 1, 2 |
+ // The message order should be 1, 2, 0, 3, 4 so that a message with a sequence id <= 4 should be dropped.
+ // However, for a MessageMetadata with both `sequence_id` and `highest_sequence_id` fields, the broker will
+ // expect a strict order so that the batch of key "A" (0, 3, 4) will be dropped.
+ // Therefore, we should update the `sequence_id` field to the highest sequence id and remove the
+ // `highest_sequence_id` field to allow the weak order.
+ batches.values().forEach(batchMessageContainer -> {
+ batchMessageContainer.setLowestSequenceId(batchMessageContainer.getHighestSequenceId());
+ });
+ return batches.values().stream().sorted((o1, o2) ->
+ (int) (o1.getLowestSequenceId() - o2.getLowestSequenceId())
+ ).map(batchMessageContainer -> {
+ try {
+ return batchMessageContainer.createOpSendMsg();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }).collect(Collectors.toList());
+ } catch (IllegalStateException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw e;
}
}
- return result;
}
@Override
public boolean hasSameSchema(MessageImpl<?> msg) {
String key = getKey(msg);
- KeyedBatch part = batches.get(key);
- if (part == null || part.messages.isEmpty()) {
- return true;
- }
- if (!part.messageMetadata.hasSchemaVersion()) {
- return msg.getSchemaVersion() == null;
- }
- return Arrays.equals(msg.getSchemaVersion(),
- part.messageMetadata.getSchemaVersion());
+ BatchMessageContainerImpl batchMessageContainer = batches.get(key);
+ return batchMessageContainer == null || batchMessageContainer.hasSameSchema(msg);
}
private String getKey(MessageImpl<?> msg) {
@@ -186,78 +133,6 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
return msg.getKey();
}
- private static class KeyedBatch {
- private final MessageMetadata messageMetadata = new MessageMetadata();
- // sequence id for this batch which will be persisted as a single entry by broker
- private long sequenceId = -1;
- private ByteBuf batchedMessageMetadataAndPayload;
- private List<MessageImpl<?>> messages = Lists.newArrayList();
- private SendCallback previousCallback = null;
- private CompressionType compressionType;
- private CompressionCodec compressor;
- private int maxBatchSize;
- private String topicName;
- private String producerName;
-
- // keep track of callbacks for individual messages being published in a batch
- private SendCallback firstCallback;
-
- private ByteBuf getCompressedBatchMetadataAndPayload() {
- for (MessageImpl<?> msg : messages) {
- batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msg.getMessageBuilder(),
- msg.getDataBuffer(), batchedMessageMetadataAndPayload);
- }
- int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
- ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
- batchedMessageMetadataAndPayload.release();
- if (compressionType != CompressionType.NONE) {
- messageMetadata.setCompression(compressionType);
- messageMetadata.setUncompressedSize(uncompressedSize);
- }
-
- // Update the current max batch size using the uncompressed size, which is what we need in any case to
- // accumulate the batch content
- maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
- return compressedPayload;
- }
-
- private void addMsg(MessageImpl<?> msg, SendCallback callback) {
- if (messages.size() == 0) {
- sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
- batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
- .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
- firstCallback = callback;
- }
- if (previousCallback != null) {
- previousCallback.addCallback(msg, callback);
- }
- previousCallback = callback;
- messages.add(msg);
- }
-
- public void discard(Exception ex) {
- try {
- // Need to protect ourselves from any exception being thrown in the future handler from the application
- if (firstCallback != null) {
- firstCallback.sendComplete(ex);
- }
- } catch (Throwable t) {
- log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
- sequenceId, t);
- }
- clear();
- }
-
- public void clear() {
- messages = Lists.newArrayList();
- firstCallback = null;
- previousCallback = null;
- messageMetadata.clear();
- sequenceId = -1;
- batchedMessageMetadataAndPayload = null;
- }
- }
-
private static final Logger log = LoggerFactory.getLogger(BatchMessageKeyBasedContainer.class);
}