You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/01 02:11:23 UTC
[pulsar] branch master updated: [Transaction] Support produce batch
transaction messages (#8415)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f6adf91 [Transaction] Support produce batch transaction messages (#8415)
f6adf91 is described below
commit f6adf91fb063a75f6ffbf8194b154e2471af043e
Author: ran <ga...@126.com>
AuthorDate: Sun Nov 1 10:11:03 2020 +0800
[Transaction] Support produce batch transaction messages (#8415)
### Motivation
Currently, Pulsar only supports produce transaction messages in no batch mode.
### Modifications
Add transaction id in the batch message container, if the newly added message has a different transaction id with the container, it will be added in the next batch.
### Verifying this change
add a new unit test.
**org.apache.pulsar.client.impl.TransactionEndToEndTest#batchProduceCommitTest**
---
.../client/impl/TransactionEndToEndTest.java | 55 ++++++----------------
.../client/impl/AbstractBatchMessageContainer.java | 19 ++++++++
.../client/impl/BatchMessageContainerBase.java | 8 ++++
.../client/impl/BatchMessageContainerImpl.java | 15 +++++-
.../client/impl/BatchMessageKeyBasedContainer.java | 17 ++++++-
.../apache/pulsar/client/impl/ProducerImpl.java | 3 +-
6 files changed, 73 insertions(+), 44 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index efd55f9..35c3ddb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -91,42 +91,13 @@ public class TransactionEndToEndTest extends TransactionTestBase {
}
@Test
- public void produceTest() throws Exception {
- String topic = NAMESPACE1 + "/txn-test";
-
- @Cleanup
- Consumer<byte[]> consumer = pulsarClient
- .newConsumer()
- .topic(topic)
- .subscriptionName("test")
- .subscribe();
-
- @Cleanup
- ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient
- .newProducer()
- .topic(topic)
- .enableBatching(false)
- .sendTimeout(0, TimeUnit.SECONDS)
- .create();
-
- Transaction txn = getTxn();
-
- for (int i = 0; i < 10; i++) {
- producer.newMessage(txn).value("Hello".getBytes()).sendAsync();
- }
-
- txn.commit().get();
-
- for (int i = 0; i < 10; i++) {
- Message<byte[]> message = consumer.receive();
- log.info("receive msg: {}", new String(message.getData()));
- }
-
+ public void noBatchProduceCommitTest() throws Exception {
+ produceCommitTest(false);
}
@Test
- public void noBatchProduceCommitTest() throws Exception {
- produceCommitTest(false);
+ public void batchProduceCommitTest() throws Exception {
+ produceCommitTest(true);
}
private void produceCommitTest(boolean enableBatch) throws Exception {
@@ -149,12 +120,16 @@ public class TransactionEndToEndTest extends TransactionTestBase {
Transaction txn1 = getTxn();
Transaction txn2 = getTxn();
- int messageCnt = 20;
+ int txn1MessageCnt = 0;
+ int txn2MessageCnt = 0;
+ int messageCnt = 1000;
for (int i = 0; i < messageCnt; i++) {
- if (i % 2 == 0) {
+ if (i % 5 == 0) {
producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
+ txn1MessageCnt ++;
} else {
producer.newMessage(txn2).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
+ txn2MessageCnt ++;
}
}
@@ -166,13 +141,12 @@ public class TransactionEndToEndTest extends TransactionTestBase {
// txn1 messages could be received after txn1 committed
int receiveCnt = 0;
- for (int i = 0; i < messageCnt / 2; i++) {
+ for (int i = 0; i < txn1MessageCnt; i++) {
message = consumer.receive();
Assert.assertNotNull(message);
- log.info("receive msgId: {}, msg: {}", message.getMessageId(), new String(message.getData(), UTF_8));
receiveCnt ++;
}
- Assert.assertEquals(messageCnt / 2, receiveCnt);
+ Assert.assertEquals(txn1MessageCnt, receiveCnt);
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
@@ -181,13 +155,12 @@ public class TransactionEndToEndTest extends TransactionTestBase {
// txn2 messages could be received after txn2 committed
receiveCnt = 0;
- for (int i = 0; i < messageCnt / 2; i++) {
+ for (int i = 0; i < txn2MessageCnt; i++) {
message = consumer.receive();
Assert.assertNotNull(message);
- log.info("receive second msgId: {}, msg: {}", message.getMessageId(), new String(message.getData(), UTF_8));
receiveCnt ++;
}
- Assert.assertEquals(messageCnt / 2, receiveCnt);
+ Assert.assertEquals(txn2MessageCnt, receiveCnt);
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 670ed1c..633c260 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
@@ -28,6 +29,7 @@ import java.util.List;
/**
* Batch message container framework.
*/
+@Slf4j
public abstract class AbstractBatchMessageContainer implements BatchMessageContainerBase {
protected PulsarApi.CompressionType compressionType;
@@ -41,6 +43,9 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
protected int numMessagesInBatch = 0;
protected long currentBatchSizeBytes = 0;
+ protected long currentTxnidMostBits = -1L;
+ protected long currentTxnidLeastBits = -1L;
+
protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
// This will be the largest size for a batch sent from this particular producer. This is used as a baseline to
@@ -93,4 +98,18 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
this.maxNumMessagesInBatch = producer.getConfiguration().getBatchingMaxMessages();
this.maxBytesInBatch = producer.getConfiguration().getBatchingMaxBytes();
}
+
+ @Override
+ public boolean hasSameTxn(MessageImpl<?> msg) {
+ if (!msg.getMessageBuilder().hasTxnidMostBits() || !msg.getMessageBuilder().hasTxnidLeastBits()) {
+ return true;
+ }
+ if (currentTxnidMostBits == -1 || currentTxnidLeastBits == -1) {
+ currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
+ currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
+ return true;
+ }
+ return currentTxnidMostBits == msg.getMessageBuilder().getTxnidMostBits()
+ && currentTxnidLeastBits == msg.getMessageBuilder().getTxnidLeastBits();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
index d444e5b..2462a40 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerBase.java
@@ -75,4 +75,12 @@ public interface BatchMessageContainerBase extends BatchMessageContainer {
* @throws IOException
*/
OpSendMsg createOpSendMsg() throws IOException;
+
+ /**
+ * Check whether the added message belong to the same txn with batch message container.
+ *
+ * @param msg added message
+ * @return belong to the same txn or not
+ */
+ boolean hasSameTxn(MessageImpl<?> msg);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 937a3e9..15bac6d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-import io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -74,6 +73,12 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
this.firstCallback = callback;
batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
.buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
+ if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
+ currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
+ }
+ if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {
+ currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
+ }
}
if (previousCallback != null) {
@@ -145,6 +150,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
lowestSequenceId = -1L;
highestSequenceId = -1L;
batchedMessageMetadataAndPayload = null;
+ currentTxnidMostBits = -1L;
+ currentTxnidLeastBits = -1L;
}
@Override
@@ -181,6 +188,12 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
}
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
messageMetadata.setHighestSequenceId(highestSequenceId);
+ if (currentTxnidMostBits != -1) {
+ messageMetadata.setTxnidMostBits(currentTxnidMostBits);
+ }
+ if (currentTxnidLeastBits != -1) {
+ messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);
+ }
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata.build(), encryptedPayload);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index d9c1c6c..0fd3c5a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -28,7 +28,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +71,14 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
part.topicName = topicName;
part.producerName = producerName;
batches.putIfAbsent(key, part);
+
+ if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
+ currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
+ }
+ if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {
+ currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
+ }
+
} else {
part.addMsg(msg, callback);
}
@@ -83,6 +90,8 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
numMessagesInBatch = 0;
currentBatchSizeBytes = 0;
batches = new HashMap<>();
+ currentTxnidMostBits = -1L;
+ currentTxnidLeastBits = -1L;
}
@Override
@@ -121,6 +130,12 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
currentBatchSizeBytes += message.getDataBuffer().readableBytes();
}
keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+ if (currentTxnidMostBits != -1) {
+ keyedBatch.messageMetadata.setTxnidMostBits(currentTxnidMostBits);
+ }
+ if (currentTxnidLeastBits != -1) {
+ keyedBatch.messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);
+ }
ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch,
keyedBatch.messageMetadata.build(), encryptedPayload);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index afc0195..6cc79a8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -671,7 +671,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
return batchMessageContainer.haveEnoughSpace(msg)
- && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg));
+ && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg))
+ && batchMessageContainer.hasSameTxn(msg);
}
private void doBatchSendAndAdd(MessageImpl<?> msg, SendCallback callback, ByteBuf payload) {