You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/05/24 09:22:05 UTC

[pulsar] 02/02: [Java Client] Fix wrong behavior of deduplication for key based batching (#15413)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dc861c46b5b57693f0f4e132a70ba9153096775e
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri May 13 20:18:42 2022 +0800

    [Java Client] Fix wrong behavior of deduplication for key based batching (#15413)
    
    ### Motivation
    
    Currently message deduplication doesn't work well for key based
    batching. First, the key based batch container doesn't update the
    `lastSequenceIdPushed`. So a batch could contain both duplicated and not
    duplicated messages. Second, when `createOpSendMsgs` is called, the
    `OpSendMsg` objects are sorted by the lowest sequence ids, and the
    highest sequence id is not set. If a batch contains sequence id 0,1,2,
    then the message with sequence id 1 or 2 won't be dropped.
    
    ### Modifications
    
    - Refactor the key based batch container that the
      `BatchMessageContainerImpl` is reused instead of maintaining a
      `KeyedBatch` class.
    - When `createOpSendMsgs` is called, clear the highest sequence id field
      and configure the sequence id field with the highest sequence id to fix
      the second issue described before.
    - Add `testKeyBasedBatchingOrder` to show and verify the current
      behavior.
    - Add test for key based batching into
      `testProducerDeduplicationWithDiscontinuousSequenceId` to verify
      `lastSlastSequenceIdPushed` is updated correctly.
    
    (cherry picked from commit a77333705ffb352da39767d53975353bb4f8864e)
---
 .../pulsar/client/api/ClientDeduplicationTest.java |  98 +++++++++-
 .../client/impl/BatchMessageContainerImpl.java     |  28 ++-
 .../client/impl/BatchMessageKeyBasedContainer.java | 211 +++++----------------
 3 files changed, 155 insertions(+), 182 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 304bb6eaaa0..52017444a2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -20,19 +20,37 @@ package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "flaky")
 public class ClientDeduplicationTest extends ProducerConsumerBase {
 
+    @DataProvider
+    public static Object[][] batchingTypes() {
+        return new Object[][] {
+                { BatcherBuilder.DEFAULT },
+                { BatcherBuilder.KEY_BASED }
+        };
+    }
+
     @BeforeClass
     @Override
     protected void setup() throws Exception {
@@ -46,7 +64,7 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test
+    @Test(priority = -1)
     public void testNamespaceDeduplicationApi() throws Exception {
         final String namespace = "my-property/my-ns";
         assertNull(admin.namespaces().getDeduplicationStatus(namespace));
@@ -174,9 +192,10 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
         producer.close();
     }
 
-    @Test(timeOut = 30000)
-    public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Exception {
-        String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId";
+    @Test(timeOut = 30000, dataProvider = "batchingTypes")
+    public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder batcherBuilder) throws Exception {
+        String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-"
+                + System.currentTimeMillis();
         admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
 
         // Set infinite timeout
@@ -185,7 +204,9 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
                         .topic(topic)
                         .producerName("my-producer-name")
                         .enableBatching(true)
+                        .batcherBuilder(batcherBuilder)
                         .batchingMaxMessages(10)
+                        .batchingMaxPublishDelay(1L, TimeUnit.HOURS)
                         .sendTimeout(0, TimeUnit.SECONDS);
 
         Producer<byte[]> producer = producerBuilder.create();
@@ -208,7 +229,8 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
         producer.flush();
 
         for (int i = 0; i < 4; i++) {
-            Message<byte[]> msg = consumer.receive();
+            Message<byte[]> msg = consumer.receive(3, TimeUnit.SECONDS);
+            assertNotNull(msg);
             assertEquals(new String(msg.getData()), "my-message-" + i);
             consumer.acknowledge(msg);
         }
@@ -284,4 +306,68 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
 
         producer.close();
     }
+
+    @Test(timeOut = 30000)
+    public void testKeyBasedBatchingOrder() throws Exception {
+        final String topic = "persistent://my-property/my-ns/test-key-based-batching-order";
+        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+
+        final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .batcherBuilder(BatcherBuilder.KEY_BASED)
+                .batchingMaxMessages(100)
+                .batchingMaxBytes(1024 * 1024 * 5)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .create();
+        // | key | sequence id list |
+        // | :-- | :--------------- |
+        // | A | 0, 3, 4 |
+        // | B | 1, 2 |
+        final List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        sendFutures.add(producer.newMessage().key("A").value("msg-0").sequenceId(0L).sendAsync());
+        sendFutures.add(producer.newMessage().key("B").value("msg-1").sequenceId(1L).sendAsync());
+        sendFutures.add(producer.newMessage().key("B").value("msg-2").sequenceId(2L).sendAsync());
+        sendFutures.add(producer.newMessage().key("A").value("msg-3").sequenceId(3L).sendAsync());
+        sendFutures.add(producer.newMessage().key("A").value("msg-4").sequenceId(4L).sendAsync());
+        // The message order is expected to be [1, 2, 0, 3, 4]. The sequence ids are not ordered strictly, but:
+        // 1. The sequence ids for a given key are ordered.
+        // 2. The highest sequence ids of batches are ordered.
+        producer.flush();
+
+        FutureUtil.waitForAll(sendFutures);
+        final List<MessageId> sendMessageIds = sendFutures.stream().map(CompletableFuture::join)
+                .collect(Collectors.toList());
+        for (int i = 0; i < sendMessageIds.size(); i++) {
+            log.info("Send msg-{} to {}", i, sendMessageIds.get(i));
+        }
+
+        final List<Long> sequenceIdList = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            log.info("Received {}, key: {}, seq id: {}, msg id: {}",
+                    msg.getValue(), msg.getKey(), msg.getSequenceId(), msg.getMessageId());
+            assertNotNull(msg);
+            sequenceIdList.add(msg.getSequenceId());
+        }
+        assertEquals(sequenceIdList, Arrays.asList(1L, 2L, 0L, 3L, 4L));
+
+        for (int i = 0; i < 5; i++) {
+            // Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned.
+            final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send();
+            assertTrue(messageId instanceof BatchMessageIdImpl);
+            final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId;
+            assertEquals(messageIdImpl.getLedgerId(), -1L);
+            assertEquals(messageIdImpl.getEntryId(), -1L);
+        }
+
+        consumer.close();
+        producer.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index cea567e8d68..996875a7131 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -19,13 +19,13 @@
 package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Lists;
-
 import io.netty.buffer.ByteBuf;
-
+import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
-
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -49,7 +49,11 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
 
     private MessageMetadata messageMetadata = new MessageMetadata();
     // sequence id for this batch which will be persisted as a single entry by broker
+    @Getter
+    @Setter
     private long lowestSequenceId = -1L;
+    @Getter
+    @Setter
     private long highestSequenceId = -1L;
     private ByteBuf batchedMessageMetadataAndPayload;
     private List<MessageImpl<?>> messages = Lists.newArrayList();
@@ -57,6 +61,14 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
     // keep track of callbacks for individual messages being published in a batch
     protected SendCallback firstCallback;
 
+    public BatchMessageContainerImpl() {
+    }
+
+    public BatchMessageContainerImpl(ProducerImpl<?> producer) {
+        this();
+        setProducer(producer);
+    }
+
     @Override
     public boolean add(MessageImpl<?> msg, SendCallback callback) {
 
@@ -82,10 +94,6 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
                 }
             } catch (Throwable e) {
                 log.error("construct first message failed, exception is ", e);
-                if (batchedMessageMetadataAndPayload != null) {
-                    // if payload has been allocated release it
-                    batchedMessageMetadataAndPayload.release();
-                }
                 discard(new PulsarClientException(e));
                 return false;
             }
@@ -104,7 +112,6 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
         }
         highestSequenceId = msg.getSequenceId();
         ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
-
         return isBatchFull();
     }
 
@@ -172,6 +179,10 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
             if (firstCallback != null) {
                 firstCallback.sendComplete(ex);
             }
+            if (batchedMessageMetadataAndPayload != null) {
+                ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
+                batchedMessageMetadataAndPayload = null;
+            }
         } catch (Throwable t) {
             log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
                     lowestSequenceId, t);
@@ -193,6 +204,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
             return null;
         }
         messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+        messageMetadata.setSequenceId(lowestSequenceId);
         messageMetadata.setHighestSequenceId(highestSequenceId);
         if (currentTxnidMostBits != -1) {
             messageMetadata.setTxnidMostBits(currentTxnidMostBits);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index 505ca75743c..77990eeeacb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -18,27 +18,12 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Lists;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.ReferenceCountUtil;
-
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.api.proto.CompressionType;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.protocol.ByteBufPair;
-import org.apache.pulsar.common.protocol.Commands;
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +38,7 @@ import org.slf4j.LoggerFactory;
  */
 class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
 
-    private Map<String, KeyedBatch> batches = new HashMap<>();
+    private final Map<String, BatchMessageContainerImpl> batches = new HashMap<>();
 
     @Override
     public boolean add(MessageImpl<?> msg, SendCallback callback) {
@@ -61,29 +46,16 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
             log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName,
                     numMessagesInBatch);
         }
-        numMessagesInBatch++;
-        currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
         String key = getKey(msg);
-        KeyedBatch part = batches.get(key);
-        if (part == null) {
-            part = new KeyedBatch();
-            part.addMsg(msg, callback);
-            part.compressionType = compressionType;
-            part.compressor = compressor;
-            part.maxBatchSize = maxBatchSize;
-            part.topicName = topicName;
-            part.producerName = producerName;
-            batches.putIfAbsent(key, part);
-
-            if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
-                currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
-            }
-            if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {
-                currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
-            }
-
-        } else {
-            part.addMsg(msg, callback);
+        final BatchMessageContainerImpl batchMessageContainer = batches.computeIfAbsent(key,
+                __ -> new BatchMessageContainerImpl(producer));
+        batchMessageContainer.add(msg, callback);
+        // The `add` method fails iff the container is empty, i.e. the `msg` is the first message to add, while `msg`
+        // was failed to add. In this case, `clear` method will be called and the batch container is empty and there is
+        // no need to update the stats.
+        if (!batchMessageContainer.isEmpty()) {
+            numMessagesInBatch++;
+            currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
         }
         return isBatchFull();
     }
@@ -92,7 +64,7 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
     public void clear() {
         numMessagesInBatch = 0;
         currentBatchSizeBytes = 0;
-        batches = new HashMap<>();
+        batches.clear();
         currentTxnidMostBits = -1L;
         currentTxnidLeastBits = -1L;
     }
@@ -104,13 +76,7 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
 
     @Override
     public void discard(Exception ex) {
-        try {
-            // Need to protect ourselves from any exception being thrown in the future handler from the application
-            batches.forEach((k, v) -> v.firstCallback.sendComplete(ex));
-        } catch (Throwable t) {
-            log.warn("[{}] [{}] Got exception while completing the callback", topicName, producerName, t);
-        }
-        batches.forEach((k, v) -> ReferenceCountUtil.safeRelease(v.batchedMessageMetadataAndPayload));
+        batches.forEach((k, v) -> v.discard(ex));
         clear();
     }
 
@@ -119,64 +85,45 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
         return true;
     }
 
-    private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException {
-        ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata, keyedBatch.getCompressedBatchMetadataAndPayload());
-        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
-            keyedBatch.discard(new PulsarClientException.InvalidMessageException(
-                    "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
-            return null;
-        }
-
-        final int numMessagesInBatch = keyedBatch.messages.size();
-        long currentBatchSizeBytes = 0;
-        for (MessageImpl<?> message : keyedBatch.messages) {
-            currentBatchSizeBytes += message.getDataBuffer().readableBytes();
-        }
-        keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
-        if (currentTxnidMostBits != -1) {
-            keyedBatch.messageMetadata.setTxnidMostBits(currentTxnidMostBits);
-        }
-        if (currentTxnidLeastBits != -1) {
-            keyedBatch.messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);
-        }
-        ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch,
-                keyedBatch.messageMetadata, encryptedPayload);
-
-        ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback);
-
-        op.setNumMessagesInBatch(numMessagesInBatch);
-        op.setBatchSizeByte(currentBatchSizeBytes);
-        return op;
-    }
-
     @Override
     public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
-        List<ProducerImpl.OpSendMsg> result = new ArrayList<>();
-        List<KeyedBatch> list = new ArrayList<>(batches.values());
-        list.sort(((o1, o2) -> ComparisonChain.start()
-                .compare(o1.sequenceId, o2.sequenceId)
-                .result()));
-        for (KeyedBatch keyedBatch : list) {
-            ProducerImpl.OpSendMsg op = createOpSendMsg(keyedBatch);
-            if (op != null) {
-                result.add(op);
+        try {
+            // In key based batching, the sequence ids might not be ordered, for example,
+            // | key | sequence id list |
+            // | :-- | :--------------- |
+            // | A | 0, 3, 4 |
+            // | B | 1, 2 |
+            // The message order should be 1, 2, 0, 3, 4 so that a message with a sequence id <= 4 should be dropped.
+            // However, for a MessageMetadata with both `sequence_id` and `highest_sequence_id` fields, the broker will
+            // expect a strict order so that the batch of key "A" (0, 3, 4) will be dropped.
+            // Therefore, we should update the `sequence_id` field to the highest sequence id and remove the
+            // `highest_sequence_id` field to allow the weak order.
+            batches.values().forEach(batchMessageContainer -> {
+                batchMessageContainer.setLowestSequenceId(batchMessageContainer.getHighestSequenceId());
+            });
+            return batches.values().stream().sorted((o1, o2) ->
+                    (int) (o1.getLowestSequenceId() - o2.getLowestSequenceId())
+            ).map(batchMessageContainer -> {
+                try {
+                    return batchMessageContainer.createOpSendMsg();
+                } catch (IOException e) {
+                    throw new IllegalStateException(e);
+                }
+            }).collect(Collectors.toList());
+        } catch (IllegalStateException e) {
+            if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw e;
             }
         }
-        return result;
     }
 
     @Override
     public boolean hasSameSchema(MessageImpl<?> msg) {
         String key = getKey(msg);
-        KeyedBatch part = batches.get(key);
-        if (part == null || part.messages.isEmpty()) {
-            return true;
-        }
-        if (!part.messageMetadata.hasSchemaVersion()) {
-            return msg.getSchemaVersion() == null;
-        }
-        return Arrays.equals(msg.getSchemaVersion(),
-                             part.messageMetadata.getSchemaVersion());
+        BatchMessageContainerImpl batchMessageContainer = batches.get(key);
+        return batchMessageContainer == null || batchMessageContainer.hasSameSchema(msg);
     }
 
     private String getKey(MessageImpl<?> msg) {
@@ -186,78 +133,6 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
         return msg.getKey();
     }
 
-    private static class KeyedBatch {
-        private final MessageMetadata messageMetadata = new MessageMetadata();
-        // sequence id for this batch which will be persisted as a single entry by broker
-        private long sequenceId = -1;
-        private ByteBuf batchedMessageMetadataAndPayload;
-        private List<MessageImpl<?>> messages = Lists.newArrayList();
-        private SendCallback previousCallback = null;
-        private CompressionType compressionType;
-        private CompressionCodec compressor;
-        private int maxBatchSize;
-        private String topicName;
-        private String producerName;
-
-        // keep track of callbacks for individual messages being published in a batch
-        private SendCallback firstCallback;
-
-        private ByteBuf getCompressedBatchMetadataAndPayload() {
-            for (MessageImpl<?> msg : messages) {
-                batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msg.getMessageBuilder(),
-                        msg.getDataBuffer(), batchedMessageMetadataAndPayload);
-            }
-            int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
-            ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
-            batchedMessageMetadataAndPayload.release();
-            if (compressionType != CompressionType.NONE) {
-                messageMetadata.setCompression(compressionType);
-                messageMetadata.setUncompressedSize(uncompressedSize);
-            }
-
-            // Update the current max batch size using the uncompressed size, which is what we need in any case to
-            // accumulate the batch content
-            maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
-            return compressedPayload;
-        }
-
-        private void addMsg(MessageImpl<?> msg, SendCallback callback) {
-            if (messages.size() == 0) {
-                sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
-                batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
-                        .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
-                firstCallback = callback;
-            }
-            if (previousCallback != null) {
-                previousCallback.addCallback(msg, callback);
-            }
-            previousCallback = callback;
-            messages.add(msg);
-        }
-
-        public void discard(Exception ex) {
-            try {
-                // Need to protect ourselves from any exception being thrown in the future handler from the application
-                if (firstCallback != null) {
-                    firstCallback.sendComplete(ex);
-                }
-            } catch (Throwable t) {
-                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
-                        sequenceId, t);
-            }
-            clear();
-        }
-
-        public void clear() {
-            messages = Lists.newArrayList();
-            firstCallback = null;
-            previousCallback = null;
-            messageMetadata.clear();
-            sequenceId = -1;
-            batchedMessageMetadataAndPayload = null;
-        }
-    }
-
     private static final Logger log = LoggerFactory.getLogger(BatchMessageKeyBasedContainer.class);
 
 }