You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/01 02:11:23 UTC

[pulsar] branch master updated: [Transaction] Support produce batch transaction messages (#8415)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f6adf91  [Transaction] Support produce batch transaction messages (#8415)
f6adf91 is described below

commit f6adf91fb063a75f6ffbf8194b154e2471af043e
Author: ran <ga...@126.com>
AuthorDate: Sun Nov 1 10:11:03 2020 +0800

    [Transaction] Support produce batch transaction messages (#8415)
    
    ### Motivation
    
    Currently, Pulsar only supports produce transaction messages in no batch mode.
    
    ### Modifications
    
    Add transaction id in the batch message container, if the newly added message has a different transaction id with the container, it will be added in the next batch.
    
    ### Verifying this change
    
    add a new unit test.
    
    **org.apache.pulsar.client.impl.TransactionEndToEndTest#batchProduceCommitTest**
---
 .../client/impl/TransactionEndToEndTest.java       | 55 ++++++----------------
 .../client/impl/AbstractBatchMessageContainer.java | 19 ++++++++
 .../client/impl/BatchMessageContainerBase.java     |  8 ++++
 .../client/impl/BatchMessageContainerImpl.java     | 15 +++++-
 .../client/impl/BatchMessageKeyBasedContainer.java | 17 ++++++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  3 +-
 6 files changed, 73 insertions(+), 44 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index efd55f9..35c3ddb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -91,42 +91,13 @@ public class TransactionEndToEndTest extends TransactionTestBase {
     }
 
     @Test
-    public void produceTest() throws Exception {
-        String topic = NAMESPACE1 + "/txn-test";
-
-        @Cleanup
-        Consumer<byte[]> consumer = pulsarClient
-                .newConsumer()
-                .topic(topic)
-                .subscriptionName("test")
-                .subscribe();
-
-        @Cleanup
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient
-                .newProducer()
-                .topic(topic)
-                .enableBatching(false)
-                .sendTimeout(0, TimeUnit.SECONDS)
-                .create();
-
-        Transaction txn = getTxn();
-
-        for (int i = 0; i < 10; i++) {
-            producer.newMessage(txn).value("Hello".getBytes()).sendAsync();
-        }
-
-        txn.commit().get();
-
-        for (int i = 0; i < 10; i++) {
-            Message<byte[]> message = consumer.receive();
-            log.info("receive msg: {}", new String(message.getData()));
-        }
-
+    public void noBatchProduceCommitTest() throws Exception {
+        produceCommitTest(false);
     }
 
     @Test
-    public void noBatchProduceCommitTest() throws Exception {
-        produceCommitTest(false);
+    public void batchProduceCommitTest() throws Exception {
+        produceCommitTest(true);
     }
 
     private void produceCommitTest(boolean enableBatch) throws Exception {
@@ -149,12 +120,16 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         Transaction txn1 = getTxn();
         Transaction txn2 = getTxn();
 
-        int messageCnt = 20;
+        int txn1MessageCnt = 0;
+        int txn2MessageCnt = 0;
+        int messageCnt = 1000;
         for (int i = 0; i < messageCnt; i++) {
-            if (i % 2 == 0) {
+            if (i % 5 == 0) {
                 producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
+                txn1MessageCnt ++;
             } else {
                 producer.newMessage(txn2).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
+                txn2MessageCnt ++;
             }
         }
 
@@ -166,13 +141,12 @@ public class TransactionEndToEndTest extends TransactionTestBase {
 
         // txn1 messages could be received after txn1 committed
         int receiveCnt = 0;
-        for (int i = 0; i < messageCnt / 2; i++) {
+        for (int i = 0; i < txn1MessageCnt; i++) {
             message = consumer.receive();
             Assert.assertNotNull(message);
-            log.info("receive msgId: {}, msg: {}", message.getMessageId(), new String(message.getData(), UTF_8));
             receiveCnt ++;
         }
-        Assert.assertEquals(messageCnt / 2, receiveCnt);
+        Assert.assertEquals(txn1MessageCnt, receiveCnt);
 
         message = consumer.receive(5, TimeUnit.SECONDS);
         Assert.assertNull(message);
@@ -181,13 +155,12 @@ public class TransactionEndToEndTest extends TransactionTestBase {
 
         // txn2 messages could be received after txn2 committed
         receiveCnt = 0;
-        for (int i = 0; i < messageCnt / 2; i++) {
+        for (int i = 0; i < txn2MessageCnt; i++) {
             message = consumer.receive();
             Assert.assertNotNull(message);
-            log.info("receive second msgId: {}, msg: {}", message.getMessageId(), new String(message.getData(), UTF_8));
             receiveCnt ++;
         }
-        Assert.assertEquals(messageCnt / 2, receiveCnt);
+        Assert.assertEquals(txn2MessageCnt, receiveCnt);
 
         message = consumer.receive(5, TimeUnit.SECONDS);
         Assert.assertNull(message);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 670ed1c..633c260 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
@@ -28,6 +29,7 @@ import java.util.List;
 /**
  * Batch message container framework.
  */
+@Slf4j
 public abstract class AbstractBatchMessageContainer implements BatchMessageContainerBase {
 
     protected PulsarApi.CompressionType compressionType;
@@ -41,6 +43,9 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
     protected int numMessagesInBatch = 0;
     protected long currentBatchSizeBytes = 0;
 
+    protected long currentTxnidMostBits = -1L;
+    protected long currentTxnidLeastBits = -1L;
+
     protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
 
     // This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
@@ -93,4 +98,18 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
         this.maxNumMessagesInBatch = producer.getConfiguration().getBatchingMaxMessages();
         this.maxBytesInBatch = producer.getConfiguration().getBatchingMaxBytes();
     }
+
+    @Override
+    public boolean hasSameTxn(MessageImpl<?> msg) {
+        if (!msg.getMessageBuilder().hasTxnidMostBits() || !msg.getMessageBuilder().hasTxnidLeastBits()) {
+            return true;
+        }
+        if (currentTxnidMostBits == -1 || currentTxnidLeastBits == -1) {
+            currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
+            currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
+            return true;
+        }
+        return currentTxnidMostBits == msg.getMessageBuilder().getTxnidMostBits()
+                && currentTxnidLeastBits == msg.getMessageBuilder().getTxnidLeastBits();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
index d444e5b..2462a40 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
@@ -75,4 +75,12 @@ public interface BatchMessageContainerBase extends BatchMessageContainer {
      * @throws IOException
      */
     OpSendMsg createOpSendMsg() throws IOException;
+
+    /**
+     * Check whether the added message belong to the same txn with batch message container.
+     *
+     * @param msg added message
+     * @return belong to the same txn or not
+     */
+    boolean hasSameTxn(MessageImpl<?> msg);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 937a3e9..15bac6d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
-import io.netty.util.ReferenceCountUtil;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -74,6 +73,12 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
             this.firstCallback = callback;
             batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
                     .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
+            if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
+                currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
+            }
+            if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {
+                currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
+            }
         }
 
         if (previousCallback != null) {
@@ -145,6 +150,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
         lowestSequenceId = -1L;
         highestSequenceId = -1L;
         batchedMessageMetadataAndPayload = null;
+        currentTxnidMostBits = -1L;
+        currentTxnidLeastBits = -1L;
     }
 
     @Override
@@ -181,6 +188,12 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
         }
         messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
         messageMetadata.setHighestSequenceId(highestSequenceId);
+        if (currentTxnidMostBits != -1) {
+            messageMetadata.setTxnidMostBits(currentTxnidMostBits);
+        }
+        if (currentTxnidLeastBits != -1) {
+            messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);
+        }
         ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
                 messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata.build(), encryptedPayload);
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index d9c1c6c..0fd3c5a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -28,7 +28,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +71,14 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
             part.topicName = topicName;
             part.producerName = producerName;
             batches.putIfAbsent(key, part);
+
+            if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
+                currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
+            }
+            if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {
+                currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
+            }
+
         } else {
             part.addMsg(msg, callback);
         }
@@ -83,6 +90,8 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
         numMessagesInBatch = 0;
         currentBatchSizeBytes = 0;
         batches = new HashMap<>();
+        currentTxnidMostBits = -1L;
+        currentTxnidLeastBits = -1L;
     }
 
     @Override
@@ -121,6 +130,12 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
             currentBatchSizeBytes += message.getDataBuffer().readableBytes();
         }
         keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+        if (currentTxnidMostBits != -1) {
+            keyedBatch.messageMetadata.setTxnidMostBits(currentTxnidMostBits);
+        }
+        if (currentTxnidLeastBits != -1) {
+            keyedBatch.messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);
+        }
         ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch,
                 keyedBatch.messageMetadata.build(), encryptedPayload);
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index afc0195..6cc79a8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -671,7 +671,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
         return batchMessageContainer.haveEnoughSpace(msg)
-               && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg));
+               && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg))
+                && batchMessageContainer.hasSameTxn(msg);
     }
 
     private void doBatchSendAndAdd(MessageImpl<?> msg, SendCallback callback, ByteBuf payload) {