You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/27 05:37:29 UTC

[pulsar] branch branch-2.8 updated (bb8c9456da4 -> 43cd22e632d)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from bb8c9456da4 [improve][security] Add load multiple certificates support in TrustManagerProxy (#14798)
     new dce262da868 Fix producerFuture not complete in ServerCnx#handleProducer (#14467)
     new d5cee50635b [cleanup] [broker] Remove useless code to avoid confusion in OpReadEntry#checkReadCompletion. (#15104)
     new f49047debfe [C++] Fix send callback might not be invoked in key based batching (#14898)
     new d046a6ddc93 [fix][broker] Fix MultiRolesTokenAuthorizationProvider `authorize` issue. (#15454)
     new 70afb39f2e4 [C++] Remove the flaky and meaningless tests (#15271)
     new 43cd22e632d [Java Client] Fix wrong behavior of deduplication for key based batching (#15413)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/OpReadEntry.java       |   5 -
 .../MultiRolesTokenAuthorizationProvider.java      |  36 +---
 .../apache/pulsar/broker/service/ServerCnx.java    |   8 +
 .../pulsar/client/api/ClientDeduplicationTest.java |  98 +++++++++-
 pulsar-client-cpp/lib/BatchMessageContainerBase.h  |  26 +++
 pulsar-client-cpp/lib/ProducerImpl.cc              |  47 ++---
 pulsar-client-cpp/tests/ClientTest.cc              |   2 +-
 pulsar-client-cpp/tests/KeyBasedBatchingTest.cc    |  32 +++-
 pulsar-client-cpp/tests/ProducerTest.cc            |  85 +--------
 .../client/impl/BatchMessageContainerImpl.java     |  27 ++-
 .../client/impl/BatchMessageKeyBasedContainer.java | 211 +++++----------------
 .../org/apache/pulsar/common/util/FutureUtil.java  |  44 +++++
 .../apache/pulsar/common/util/FutureUtilTest.java  |  53 +++++-
 13 files changed, 334 insertions(+), 340 deletions(-)


[pulsar] 06/06: [Java Client] Fix wrong behavior of deduplication for key based batching (#15413)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 43cd22e632d1938b2c5ee7cd860fc46d20fed921
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri May 13 20:18:42 2022 +0800

    [Java Client] Fix wrong behavior of deduplication for key based batching (#15413)
    
    ### Motivation
    
    Currently message deduplication doesn't work well for key based
    batching. First, the key based batch container doesn't update the
    `lastSequenceIdPushed`. So a batch could contain both duplicated and not
    duplicated messages. Second, when `createOpSendMsgs` is called, the
    `OpSendMsg` objects are sorted by the lowest sequence ids, and the
    highest sequence id is not set. If a batch contains sequence id 0,1,2,
    then the message with sequence id 1 or 2 won't be dropped.
    
    ### Modifications
    
    - Refactor the key based batch container that the
      `BatchMessageContainerImpl` is reused instead of maintaining a
      `KeyedBatch` class.
    - When `createOpSendMsgs` is called, clear the highest sequence id field
      and configure the sequence id field with the highest sequence id to fix
      the second issue described before.
    - Add `testKeyBasedBatchingOrder` to show and verify the current
      behavior.
    - Add test for key based batching into
      `testProducerDeduplicationWithDiscontinuousSequenceId` to verify
      `lastSlastSequenceIdPushed` is updated correctly.
    
    (cherry picked from commit a77333705ffb352da39767d53975353bb4f8864e)
---
 .../pulsar/client/api/ClientDeduplicationTest.java |  98 +++++++++-
 .../client/impl/BatchMessageContainerImpl.java     |  27 ++-
 .../client/impl/BatchMessageKeyBasedContainer.java | 211 +++++----------------
 3 files changed, 155 insertions(+), 181 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 304bb6eaaa0..52017444a2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -20,19 +20,37 @@ package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "flaky")
 public class ClientDeduplicationTest extends ProducerConsumerBase {
 
+    @DataProvider
+    public static Object[][] batchingTypes() {
+        return new Object[][] {
+                { BatcherBuilder.DEFAULT },
+                { BatcherBuilder.KEY_BASED }
+        };
+    }
+
     @BeforeClass
     @Override
     protected void setup() throws Exception {
@@ -46,7 +64,7 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test
+    @Test(priority = -1)
     public void testNamespaceDeduplicationApi() throws Exception {
         final String namespace = "my-property/my-ns";
         assertNull(admin.namespaces().getDeduplicationStatus(namespace));
@@ -174,9 +192,10 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
         producer.close();
     }
 
-    @Test(timeOut = 30000)
-    public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Exception {
-        String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId";
+    @Test(timeOut = 30000, dataProvider = "batchingTypes")
+    public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder batcherBuilder) throws Exception {
+        String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-"
+                + System.currentTimeMillis();
         admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
 
         // Set infinite timeout
@@ -185,7 +204,9 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
                         .topic(topic)
                         .producerName("my-producer-name")
                         .enableBatching(true)
+                        .batcherBuilder(batcherBuilder)
                         .batchingMaxMessages(10)
+                        .batchingMaxPublishDelay(1L, TimeUnit.HOURS)
                         .sendTimeout(0, TimeUnit.SECONDS);
 
         Producer<byte[]> producer = producerBuilder.create();
@@ -208,7 +229,8 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
         producer.flush();
 
         for (int i = 0; i < 4; i++) {
-            Message<byte[]> msg = consumer.receive();
+            Message<byte[]> msg = consumer.receive(3, TimeUnit.SECONDS);
+            assertNotNull(msg);
             assertEquals(new String(msg.getData()), "my-message-" + i);
             consumer.acknowledge(msg);
         }
@@ -284,4 +306,68 @@ public class ClientDeduplicationTest extends ProducerConsumerBase {
 
         producer.close();
     }
+
+    @Test(timeOut = 30000)
+    public void testKeyBasedBatchingOrder() throws Exception {
+        final String topic = "persistent://my-property/my-ns/test-key-based-batching-order";
+        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+
+        final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .batcherBuilder(BatcherBuilder.KEY_BASED)
+                .batchingMaxMessages(100)
+                .batchingMaxBytes(1024 * 1024 * 5)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .create();
+        // | key | sequence id list |
+        // | :-- | :--------------- |
+        // | A | 0, 3, 4 |
+        // | B | 1, 2 |
+        final List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        sendFutures.add(producer.newMessage().key("A").value("msg-0").sequenceId(0L).sendAsync());
+        sendFutures.add(producer.newMessage().key("B").value("msg-1").sequenceId(1L).sendAsync());
+        sendFutures.add(producer.newMessage().key("B").value("msg-2").sequenceId(2L).sendAsync());
+        sendFutures.add(producer.newMessage().key("A").value("msg-3").sequenceId(3L).sendAsync());
+        sendFutures.add(producer.newMessage().key("A").value("msg-4").sequenceId(4L).sendAsync());
+        // The message order is expected to be [1, 2, 0, 3, 4]. The sequence ids are not ordered strictly, but:
+        // 1. The sequence ids for a given key are ordered.
+        // 2. The highest sequence ids of batches are ordered.
+        producer.flush();
+
+        FutureUtil.waitForAll(sendFutures);
+        final List<MessageId> sendMessageIds = sendFutures.stream().map(CompletableFuture::join)
+                .collect(Collectors.toList());
+        for (int i = 0; i < sendMessageIds.size(); i++) {
+            log.info("Send msg-{} to {}", i, sendMessageIds.get(i));
+        }
+
+        final List<Long> sequenceIdList = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            log.info("Received {}, key: {}, seq id: {}, msg id: {}",
+                    msg.getValue(), msg.getKey(), msg.getSequenceId(), msg.getMessageId());
+            assertNotNull(msg);
+            sequenceIdList.add(msg.getSequenceId());
+        }
+        assertEquals(sequenceIdList, Arrays.asList(1L, 2L, 0L, 3L, 4L));
+
+        for (int i = 0; i < 5; i++) {
+            // Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned.
+            final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send();
+            assertTrue(messageId instanceof BatchMessageIdImpl);
+            final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId;
+            assertEquals(messageIdImpl.getLedgerId(), -1L);
+            assertEquals(messageIdImpl.getEntryId(), -1L);
+        }
+
+        consumer.close();
+        producer.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index cea567e8d68..5811db76a64 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -21,11 +21,12 @@ package org.apache.pulsar.client.impl;
 import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
-
+import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
-
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -49,7 +50,11 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
 
     private MessageMetadata messageMetadata = new MessageMetadata();
     // sequence id for this batch which will be persisted as a single entry by broker
+    @Getter
+    @Setter
     private long lowestSequenceId = -1L;
+    @Getter
+    @Setter
     private long highestSequenceId = -1L;
     private ByteBuf batchedMessageMetadataAndPayload;
     private List<MessageImpl<?>> messages = Lists.newArrayList();
@@ -57,6 +62,14 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
     // keep track of callbacks for individual messages being published in a batch
     protected SendCallback firstCallback;
 
+    public BatchMessageContainerImpl() {
+    }
+
+    public BatchMessageContainerImpl(ProducerImpl<?> producer) {
+        this();
+        setProducer(producer);
+    }
+
     @Override
     public boolean add(MessageImpl<?> msg, SendCallback callback) {
 
@@ -82,10 +95,6 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
                 }
             } catch (Throwable e) {
                 log.error("construct first message failed, exception is ", e);
-                if (batchedMessageMetadataAndPayload != null) {
-                    // if payload has been allocated release it
-                    batchedMessageMetadataAndPayload.release();
-                }
                 discard(new PulsarClientException(e));
                 return false;
             }
@@ -104,7 +113,6 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
         }
         highestSequenceId = msg.getSequenceId();
         ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
-
         return isBatchFull();
     }
 
@@ -172,6 +180,10 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
             if (firstCallback != null) {
                 firstCallback.sendComplete(ex);
             }
+            if (batchedMessageMetadataAndPayload != null) {
+                ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
+                batchedMessageMetadataAndPayload = null;
+            }
         } catch (Throwable t) {
             log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
                     lowestSequenceId, t);
@@ -193,6 +205,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
             return null;
         }
         messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+        messageMetadata.setSequenceId(lowestSequenceId);
         messageMetadata.setHighestSequenceId(highestSequenceId);
         if (currentTxnidMostBits != -1) {
             messageMetadata.setTxnidMostBits(currentTxnidMostBits);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index 505ca75743c..77990eeeacb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -18,27 +18,12 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Lists;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.ReferenceCountUtil;
-
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.api.proto.CompressionType;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.protocol.ByteBufPair;
-import org.apache.pulsar.common.protocol.Commands;
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +38,7 @@ import org.slf4j.LoggerFactory;
  */
 class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
 
-    private Map<String, KeyedBatch> batches = new HashMap<>();
+    private final Map<String, BatchMessageContainerImpl> batches = new HashMap<>();
 
     @Override
     public boolean add(MessageImpl<?> msg, SendCallback callback) {
@@ -61,29 +46,16 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
             log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName,
                     numMessagesInBatch);
         }
-        numMessagesInBatch++;
-        currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
         String key = getKey(msg);
-        KeyedBatch part = batches.get(key);
-        if (part == null) {
-            part = new KeyedBatch();
-            part.addMsg(msg, callback);
-            part.compressionType = compressionType;
-            part.compressor = compressor;
-            part.maxBatchSize = maxBatchSize;
-            part.topicName = topicName;
-            part.producerName = producerName;
-            batches.putIfAbsent(key, part);
-
-            if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
-                currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
-            }
-            if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {
-                currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
-            }
-
-        } else {
-            part.addMsg(msg, callback);
+        final BatchMessageContainerImpl batchMessageContainer = batches.computeIfAbsent(key,
+                __ -> new BatchMessageContainerImpl(producer));
+        batchMessageContainer.add(msg, callback);
+        // The `add` method fails iff the container is empty, i.e. the `msg` is the first message to add, while `msg`
+        // was failed to add. In this case, `clear` method will be called and the batch container is empty and there is
+        // no need to update the stats.
+        if (!batchMessageContainer.isEmpty()) {
+            numMessagesInBatch++;
+            currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
         }
         return isBatchFull();
     }
@@ -92,7 +64,7 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
     public void clear() {
         numMessagesInBatch = 0;
         currentBatchSizeBytes = 0;
-        batches = new HashMap<>();
+        batches.clear();
         currentTxnidMostBits = -1L;
         currentTxnidLeastBits = -1L;
     }
@@ -104,13 +76,7 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
 
     @Override
     public void discard(Exception ex) {
-        try {
-            // Need to protect ourselves from any exception being thrown in the future handler from the application
-            batches.forEach((k, v) -> v.firstCallback.sendComplete(ex));
-        } catch (Throwable t) {
-            log.warn("[{}] [{}] Got exception while completing the callback", topicName, producerName, t);
-        }
-        batches.forEach((k, v) -> ReferenceCountUtil.safeRelease(v.batchedMessageMetadataAndPayload));
+        batches.forEach((k, v) -> v.discard(ex));
         clear();
     }
 
@@ -119,64 +85,45 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
         return true;
     }
 
-    private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException {
-        ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata, keyedBatch.getCompressedBatchMetadataAndPayload());
-        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
-            keyedBatch.discard(new PulsarClientException.InvalidMessageException(
-                    "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
-            return null;
-        }
-
-        final int numMessagesInBatch = keyedBatch.messages.size();
-        long currentBatchSizeBytes = 0;
-        for (MessageImpl<?> message : keyedBatch.messages) {
-            currentBatchSizeBytes += message.getDataBuffer().readableBytes();
-        }
-        keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
-        if (currentTxnidMostBits != -1) {
-            keyedBatch.messageMetadata.setTxnidMostBits(currentTxnidMostBits);
-        }
-        if (currentTxnidLeastBits != -1) {
-            keyedBatch.messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);
-        }
-        ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch,
-                keyedBatch.messageMetadata, encryptedPayload);
-
-        ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback);
-
-        op.setNumMessagesInBatch(numMessagesInBatch);
-        op.setBatchSizeByte(currentBatchSizeBytes);
-        return op;
-    }
-
     @Override
     public List<ProducerImpl.OpSendMsg> createOpSendMsgs() throws IOException {
-        List<ProducerImpl.OpSendMsg> result = new ArrayList<>();
-        List<KeyedBatch> list = new ArrayList<>(batches.values());
-        list.sort(((o1, o2) -> ComparisonChain.start()
-                .compare(o1.sequenceId, o2.sequenceId)
-                .result()));
-        for (KeyedBatch keyedBatch : list) {
-            ProducerImpl.OpSendMsg op = createOpSendMsg(keyedBatch);
-            if (op != null) {
-                result.add(op);
+        try {
+            // In key based batching, the sequence ids might not be ordered, for example,
+            // | key | sequence id list |
+            // | :-- | :--------------- |
+            // | A | 0, 3, 4 |
+            // | B | 1, 2 |
+            // The message order should be 1, 2, 0, 3, 4 so that a message with a sequence id <= 4 should be dropped.
+            // However, for a MessageMetadata with both `sequence_id` and `highest_sequence_id` fields, the broker will
+            // expect a strict order so that the batch of key "A" (0, 3, 4) will be dropped.
+            // Therefore, we should update the `sequence_id` field to the highest sequence id and remove the
+            // `highest_sequence_id` field to allow the weak order.
+            batches.values().forEach(batchMessageContainer -> {
+                batchMessageContainer.setLowestSequenceId(batchMessageContainer.getHighestSequenceId());
+            });
+            return batches.values().stream().sorted((o1, o2) ->
+                    (int) (o1.getLowestSequenceId() - o2.getLowestSequenceId())
+            ).map(batchMessageContainer -> {
+                try {
+                    return batchMessageContainer.createOpSendMsg();
+                } catch (IOException e) {
+                    throw new IllegalStateException(e);
+                }
+            }).collect(Collectors.toList());
+        } catch (IllegalStateException e) {
+            if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw e;
             }
         }
-        return result;
     }
 
     @Override
     public boolean hasSameSchema(MessageImpl<?> msg) {
         String key = getKey(msg);
-        KeyedBatch part = batches.get(key);
-        if (part == null || part.messages.isEmpty()) {
-            return true;
-        }
-        if (!part.messageMetadata.hasSchemaVersion()) {
-            return msg.getSchemaVersion() == null;
-        }
-        return Arrays.equals(msg.getSchemaVersion(),
-                             part.messageMetadata.getSchemaVersion());
+        BatchMessageContainerImpl batchMessageContainer = batches.get(key);
+        return batchMessageContainer == null || batchMessageContainer.hasSameSchema(msg);
     }
 
     private String getKey(MessageImpl<?> msg) {
@@ -186,78 +133,6 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
         return msg.getKey();
     }
 
-    private static class KeyedBatch {
-        private final MessageMetadata messageMetadata = new MessageMetadata();
-        // sequence id for this batch which will be persisted as a single entry by broker
-        private long sequenceId = -1;
-        private ByteBuf batchedMessageMetadataAndPayload;
-        private List<MessageImpl<?>> messages = Lists.newArrayList();
-        private SendCallback previousCallback = null;
-        private CompressionType compressionType;
-        private CompressionCodec compressor;
-        private int maxBatchSize;
-        private String topicName;
-        private String producerName;
-
-        // keep track of callbacks for individual messages being published in a batch
-        private SendCallback firstCallback;
-
-        private ByteBuf getCompressedBatchMetadataAndPayload() {
-            for (MessageImpl<?> msg : messages) {
-                batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msg.getMessageBuilder(),
-                        msg.getDataBuffer(), batchedMessageMetadataAndPayload);
-            }
-            int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
-            ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
-            batchedMessageMetadataAndPayload.release();
-            if (compressionType != CompressionType.NONE) {
-                messageMetadata.setCompression(compressionType);
-                messageMetadata.setUncompressedSize(uncompressedSize);
-            }
-
-            // Update the current max batch size using the uncompressed size, which is what we need in any case to
-            // accumulate the batch content
-            maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
-            return compressedPayload;
-        }
-
-        private void addMsg(MessageImpl<?> msg, SendCallback callback) {
-            if (messages.size() == 0) {
-                sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
-                batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
-                        .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
-                firstCallback = callback;
-            }
-            if (previousCallback != null) {
-                previousCallback.addCallback(msg, callback);
-            }
-            previousCallback = callback;
-            messages.add(msg);
-        }
-
-        public void discard(Exception ex) {
-            try {
-                // Need to protect ourselves from any exception being thrown in the future handler from the application
-                if (firstCallback != null) {
-                    firstCallback.sendComplete(ex);
-                }
-            } catch (Throwable t) {
-                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
-                        sequenceId, t);
-            }
-            clear();
-        }
-
-        public void clear() {
-            messages = Lists.newArrayList();
-            firstCallback = null;
-            previousCallback = null;
-            messageMetadata.clear();
-            sequenceId = -1;
-            batchedMessageMetadataAndPayload = null;
-        }
-    }
-
     private static final Logger log = LoggerFactory.getLogger(BatchMessageKeyBasedContainer.class);
 
 }


[pulsar] 04/06: [fix][broker] Fix MultiRolesTokenAuthorizationProvider `authorize` issue. (#15454)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d046a6ddc938d4ad74cb22e0429f134ffb3710ca
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon May 9 09:11:19 2022 +0800

    [fix][broker] Fix MultiRolesTokenAuthorizationProvider `authorize` issue. (#15454)
    
    (cherry picked from commit 19f61d53b88bb195fabb367be722694902c79d22)
    
    To resolve the conflicts, change `FutureUtil#waitForAny`'s parameter
    from `Collection` to `List` since #15329 cannot be cherry-picked.
---
 .../MultiRolesTokenAuthorizationProvider.java      | 36 +++------------
 .../org/apache/pulsar/common/util/FutureUtil.java  | 44 ++++++++++++++++++
 .../apache/pulsar/common/util/FutureUtilTest.java  | 53 ++++++++++++++++++++--
 3 files changed, 100 insertions(+), 33 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
index 9e4b71cdc6f..ba7d580e88f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -23,6 +23,12 @@ import io.jsonwebtoken.Jwt;
 import io.jsonwebtoken.JwtParser;
 import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.RequiredTypeException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -38,14 +44,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
-
 public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationProvider {
     private static final Logger log = LoggerFactory.getLogger(MultiRolesTokenAuthorizationProvider.class);
 
@@ -136,27 +134,7 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
         }
         List<CompletableFuture<Boolean>> futures = new ArrayList<>(roles.size());
         roles.forEach(r -> futures.add(authorizeFunc.apply(r)));
-        return CompletableFuture.supplyAsync(() -> {
-            do {
-                try {
-                    List<CompletableFuture<Boolean>> doneFutures = new ArrayList<>();
-                    FutureUtil.waitForAny(futures).get();
-                    for (CompletableFuture<Boolean> future : futures) {
-                        if (!future.isDone()) continue;
-                        doneFutures.add(future);
-                        if (future.get()) {
-                            futures.forEach(f -> {
-                                if (!f.isDone()) f.cancel(false);
-                            });
-                            return true;
-                        }
-                    }
-                    futures.removeAll(doneFutures);
-                } catch (InterruptedException | ExecutionException ignored) {
-                }
-            } while (!futures.isEmpty());
-            return false;
-        });
+        return FutureUtil.waitForAny(futures, ret -> (boolean) ret).thenApply(v -> v.isPresent());
     }
 
     /**
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 7d9fd79aa7f..e0a4d8f818e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -28,7 +28,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * This class is aimed at simplifying work with {@code CompletableFuture}.
@@ -55,6 +57,48 @@ public class FutureUtil {
         return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    /**
+     * Return a future that represents the completion of any future that match the predicate in the provided Collection.
+     *
+     * @param futures futures to wait any
+     * @param tester if any future match the predicate
+     * @return a new CompletableFuture that is completed when any of the given CompletableFutures match the tester
+     */
+    public static CompletableFuture<Optional<Object>> waitForAny(List<? extends CompletableFuture<?>> futures,
+                                                       Predicate<Object> tester) {
+        return waitForAny(futures).thenCompose(v -> {
+            if (tester.test(v)) {
+                futures.forEach(f -> {
+                    if (!f.isDone()) {
+                        f.cancel(true);
+                    }
+                });
+                return CompletableFuture.completedFuture(Optional.of(v));
+            }
+            List<CompletableFuture<?>> doneFutures = futures.stream()
+                    .filter(f -> f.isDone())
+                    .collect(Collectors.toList());
+            futures.removeAll(doneFutures);
+            Optional<?> value = doneFutures.stream()
+                    .filter(f -> !f.isCompletedExceptionally())
+                    .map(CompletableFuture::join)
+                    .filter(tester)
+                    .findFirst();
+            if (!value.isPresent()) {
+                if (futures.size() == 0) {
+                    return CompletableFuture.completedFuture(Optional.empty());
+                }
+                return waitForAny(futures, tester);
+            }
+            futures.forEach(f -> {
+                if (!f.isDone()) {
+                    f.cancel(true);
+                }
+            });
+            return CompletableFuture.completedFuture(Optional.of(value.get()));
+        });
+    }
+
 
     /**
      * Return a future that represents the completion of the futures in the provided list.
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
index b9458bf8e1e..5adff2c1585 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
@@ -19,19 +19,24 @@
 
 package org.apache.pulsar.common.util;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.time.Duration;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
+import org.assertj.core.util.Lists;
 import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class FutureUtilTest {
 
@@ -91,4 +96,44 @@ public class FutureUtilTest {
             assertEquals(executionException.getCause(), e);
         }
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testWaitForAny() {
+        CompletableFuture<String> f1 = new CompletableFuture<>();
+        CompletableFuture<String> f2 = new CompletableFuture<>();
+        CompletableFuture<String> f3 = new CompletableFuture<>();
+        CompletableFuture<String> f4 = new CompletableFuture<>();
+        f1.complete("1");
+        f2.complete("2");
+        f3.complete("3");
+        f4.complete("4");
+        CompletableFuture<Optional<Object>> ret = FutureUtil.waitForAny(Lists.newArrayList(f1, f2, f3, f4), p -> p.equals("3"));
+        assertEquals(ret.join().get(), "3");
+        // test not matched predicate result
+        CompletableFuture<String> f5 = new CompletableFuture<>();
+        CompletableFuture<String> f6 = new CompletableFuture<>();
+        f5.complete("5");
+        f6.complete("6");
+        ret = FutureUtil.waitForAny(Lists.newArrayList(f5, f6), p -> p.equals("3"));
+        assertFalse(ret.join().isPresent());
+        // test one complete, others are cancelled.
+        CompletableFuture<String> f55 = new CompletableFuture<>();
+        CompletableFuture<String> f66 = new CompletableFuture<>();
+        f55.complete("55");
+        ret = FutureUtil.waitForAny(Lists.newArrayList(f55, f66), p -> p.equals("55"));
+        assertTrue(ret.join().isPresent());
+        assertTrue(f66.isCancelled());
+        // test with exception
+        CompletableFuture<String> f7 = new CompletableFuture<>();
+        CompletableFuture<String> f8 = new CompletableFuture<>();
+        f8.completeExceptionally(new RuntimeException("f7 exception"));
+        f8.completeExceptionally(new RuntimeException("f8 exception"));
+        ret = FutureUtil.waitForAny(Lists.newArrayList(f7, f8), p -> p.equals("3"));
+        try {
+            ret.join();
+            fail("Should have failed");
+        } catch (CompletionException ex) {
+            assertTrue(ex.getCause() instanceof RuntimeException);
+        }
+    }
+}


[pulsar] 01/06: Fix producerFuture not complete in ServerCnx#handleProducer (#14467)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dce262da868b492b9af7f461e8785ab55479886c
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Fri Feb 25 15:56:55 2022 +0800

    Fix producerFuture not complete in ServerCnx#handleProducer (#14467)
    
    (cherry picked from commit e937da92a5c6b79f44436720a81762b0ce6a8139)
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bf65a72fb75..138f9490df7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1339,6 +1339,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             }
                             producers.remove(producerId, producerFuture);
 
+                            log.error("producerId {}, requestId {} : TransactionBuffer recover failed",
+                                    producerId, requestId, exception);
+                            if (producerFuture.completeExceptionally(exception)) {
+                                commandSender.sendErrorResponse(requestId,
+                                        ServiceUnitNotReadyException.getClientErrorCode(cause),
+                                        cause.getMessage());
+                            }
+                            producers.remove(producerId, producerFuture);
                             return null;
                         });
                     } else {


[pulsar] 03/06: [C++] Fix send callback might not be invoked in key based batching (#14898)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f49047debfe5e04b4dacfd44b42e01b400e86936
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Mar 29 01:45:14 2022 +0800

    [C++] Fix send callback might not be invoked in key based batching (#14898)
    
    * [C++] Fix send callback might not be invoked in key based batching
    
    ### Motivation
    
    When C++ client enables key based batching, there is a chance that the
    send callback is not invoked. See
    https://github.com/apache/pulsar/blob/32df93f693bfdf42953bd728a12ecdea1796dcc8/pulsar-client-cpp/lib/ProducerImpl.cc#L272-L275
    
    If a batch container has multiple batches, only one batch could be
    processed during `closeAsync`. Even worse, the semaphores of other
    batches won't be released.
    
    ### Modifications
    
    - Add a `clearPendingBatches` method to clear all pending batches and
      process them. Then call this method in `closeAsync` and
      `getPendingCallbacksWhenFailed`.
    - Add a test `testCloseBeforeSend` to verify when a producer has
      multiple pending batches, all callbacks can be invoked in
      `closeAsync`.
    
    * Add processAndClear() to batch message container
    
    (cherry picked from commit f3295ff0b14526de27791493d4c45cf814ef3654)
---
 pulsar-client-cpp/lib/BatchMessageContainerBase.h | 26 +++++++++++++
 pulsar-client-cpp/lib/ProducerImpl.cc             | 47 ++++++-----------------
 pulsar-client-cpp/tests/KeyBasedBatchingTest.cc   | 32 ++++++++++++++-
 3 files changed, 69 insertions(+), 36 deletions(-)

diff --git a/pulsar-client-cpp/lib/BatchMessageContainerBase.h b/pulsar-client-cpp/lib/BatchMessageContainerBase.h
index 8a32d8e9dca..71eef5fab62 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainerBase.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainerBase.h
@@ -112,6 +112,9 @@ class BatchMessageContainerBase : public boost::noncopyable {
     bool hasEnoughSpace(const Message& msg) const noexcept;
     bool isEmpty() const noexcept;
 
+    void processAndClear(std::function<void(Result, const OpSendMsg&)> opSendMsgCallback,
+                         FlushCallback flushCallback);
+
    protected:
     // references to ProducerImpl's fields
     const std::string& topicName_;
@@ -157,6 +160,29 @@ inline void BatchMessageContainerBase::resetStats() {
     sizeInBytes_ = 0;
 }
 
+inline void BatchMessageContainerBase::processAndClear(
+    std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) {
+    if (isEmpty()) {
+        if (flushCallback) {
+            flushCallback(ResultOk);
+        }
+    } else {
+        const auto numBatches = getNumBatches();
+        if (numBatches == 1) {
+            OpSendMsg opSendMsg;
+            Result result = createOpSendMsg(opSendMsg, flushCallback);
+            opSendMsgCallback(result, opSendMsg);
+        } else if (numBatches > 1) {
+            std::vector<OpSendMsg> opSendMsgs;
+            std::vector<Result> results = createOpSendMsgs(opSendMsgs, flushCallback);
+            for (size_t i = 0; i < results.size(); i++) {
+                opSendMsgCallback(results[i], opSendMsgs[i]);
+            }
+        }  // else numBatches is 0, do nothing
+    }
+    clear();
+}
+
 inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainerBase& container) {
     container.serialize(os);
     return os;
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index e9812d46054..e15d388ef64 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -268,13 +268,14 @@ std::shared_ptr<ProducerImpl::PendingCallbacks> ProducerImpl::getPendingCallback
     }
 
     if (batchMessageContainer_) {
-        OpSendMsg opSendMsg;
-        if (batchMessageContainer_->createOpSendMsg(opSendMsg) == ResultOk) {
-            callbacks->opSendMsgs.emplace_back(opSendMsg);
-        }
-
-        releaseSemaphoreForSendOp(opSendMsg);
-        batchMessageContainer_->clear();
+        batchMessageContainer_->processAndClear(
+            [this, &callbacks](Result result, const OpSendMsg& opSendMsg) {
+                if (result == ResultOk) {
+                    callbacks->opSendMsgs.emplace_back(opSendMsg);
+                }
+                releaseSemaphoreForSendOp(opSendMsg);
+            },
+            nullptr);
     }
     pendingMessagesQueue_.clear();
 
@@ -507,15 +508,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall
     LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
     batchTimer_->cancel();
 
-    if (PULSAR_UNLIKELY(batchMessageContainer_->isEmpty())) {
-        if (flushCallback) {
-            flushCallback(ResultOk);
-        }
-    } else {
-        const size_t numBatches = batchMessageContainer_->getNumBatches();
-        if (numBatches == 1) {
-            OpSendMsg opSendMsg;
-            Result result = batchMessageContainer_->createOpSendMsg(opSendMsg, flushCallback);
+    batchMessageContainer_->processAndClear(
+        [this, &failures](Result result, const OpSendMsg& opSendMsg) {
             if (result == ResultOk) {
                 sendMessage(opSendMsg);
             } else {
@@ -525,25 +519,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall
                 releaseSemaphoreForSendOp(opSendMsg);
                 failures.add(std::bind(opSendMsg.sendCallback_, result, MessageId{}));
             }
-        } else if (numBatches > 1) {
-            std::vector<OpSendMsg> opSendMsgs;
-            std::vector<Result> results = batchMessageContainer_->createOpSendMsgs(opSendMsgs, flushCallback);
-            for (size_t i = 0; i < results.size(); i++) {
-                if (results[i] == ResultOk) {
-                    sendMessage(opSendMsgs[i]);
-                } else {
-                    // A spot has been reserved for this batch, but the batch failed to be pushed to the
-                    // queue, so we need to release the spot manually
-                    LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsgs[" << i
-                                                                                  << "]: " << results[i]);
-                    releaseSemaphoreForSendOp(opSendMsgs[i]);
-                    failures.add(std::bind(opSendMsgs[i].sendCallback_, results[i], MessageId{}));
-                }
-            }
-        }  // else numBatches is 0, do nothing
-    }
-
-    batchMessageContainer_->clear();
+        },
+        flushCallback);
     return failures;
 }
 
diff --git a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
index 7b39554806b..d22152f4d81 100644
--- a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
+++ b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
@@ -41,7 +41,6 @@ class KeyBasedBatchingTest : public ::testing::Test {
 
     void TearDown() override { client_.close(); }
 
-    void setTopicName(const std::string& topicName) { topicName_ = topicName; }
     void initTopicName(const std::string& testName) {
         topicName_ = "KeyBasedBatchingTest-" + testName + "-" + std::to_string(time(nullptr));
     }
@@ -178,3 +177,34 @@ TEST_F(KeyBasedBatchingTest, testSingleBatch) {
     ASSERT_EQ(ResultTimeout, consumer_.receive(msg, 3000));
     ASSERT_EQ(numMessageSent.load(), numMessages);
 }
+
+TEST_F(KeyBasedBatchingTest, testCloseBeforeSend) {
+    initTopicName("CloseBeforeSend");
+    // Any asynchronous send won't be completed unless `close()` or `flush()` is triggered
+    initProducer(createDefaultProducerConfig().setBatchingMaxMessages(static_cast<unsigned>(-1)));
+
+    std::mutex mtx;
+    std::vector<Result> results;
+    auto saveResult = [&mtx, &results](Result result) {
+        std::lock_guard<std::mutex> lock(mtx);
+        results.emplace_back(result);
+    };
+    auto sendAsync = [saveResult, this](const std::string& key, const std::string& value) {
+        producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(),
+                            [saveResult](Result result, const MessageId& id) { saveResult(result); });
+    };
+
+    constexpr int numKeys = 10;
+    for (int i = 0; i < numKeys; i++) {
+        sendAsync("key-" + std::to_string(i), "value");
+    }
+
+    ASSERT_EQ(ResultOk, producer_.close());
+
+    // After close() completed, all callbacks should have failed with ResultAlreadyClosed
+    std::lock_guard<std::mutex> lock(mtx);
+    ASSERT_EQ(results.size(), numKeys);
+    for (int i = 0; i < numKeys; i++) {
+        ASSERT_EQ(results[i], ResultAlreadyClosed) << " results[" << i << "] is " << results[i];
+    }
+}


[pulsar] 02/06: [cleanup] [broker] Remove useless code to avoid confusion in OpReadEntry#checkReadCompletion. (#15104)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d5cee50635bbe61e642b33ab8f76fd15f3d60f2e
Author: 赵延 <ho...@apache.org>
AuthorDate: Mon Apr 11 12:34:36 2022 +0800

    [cleanup] [broker] Remove useless code to avoid confusion in OpReadEntry#checkReadCompletion. (#15104)
    
    (cherry picked from commit 93761284b9f6875da0403f5fedb6ccbfbbcd7315)
---
 .../main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java    | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 91a6e26f567..b751034a0c4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -137,11 +137,6 @@ class OpReadEntry implements ReadEntriesCallback {
         if (entries.size() < count && cursor.hasMoreEntries() &&
                 ((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
             // We still have more entries to read from the next ledger, schedule a new async operation
-            if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
-                cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
-            }
-
-            // Schedule next read in a different thread
             cursor.ledger.getExecutor().execute(safeRun(() -> {
                 readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);


[pulsar] 05/06: [C++] Remove the flaky and meaningless tests (#15271)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 70afb39f2e4049d5f1c935fd9c8a57df6633494c
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Apr 22 23:48:42 2022 +0800

    [C++] Remove the flaky and meaningless tests (#15271)
    
    Fixes #13849
    Fixes #14848
    
    ### Motivation
    
    #11570 adds a `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers` for
    the case that some `sendAsync` calls that are invoked after `closeAsync`
    is called in another thread must complete with `ResultAlreadyClosed`.
    It's flaky because the synchronization between two threads is not
    strict. This test uses `sendStartLatch` for the order of `sendAsync` and
    `closeAsync`:
    
    ```
    sendAsync 0,1,...,9 -> sendStartLatch is done -> closeAsync
    ```
    
    However, it cannot guarantee the rest `sendAsync` calls happen after
    `closeAsync` is called. If so, all `sendAsync` calls will complete with
    `ResultOk`.
    
    On the other hand, this test is meaningless because it requires strict
    synchronization between two threads so there is no need to run
    `sendAsync` and `closeAsync` in two threads.
    
    The verification of this test is also wrong, see
    https://github.com/apache/pulsar/issues/13849#issuecomment-1079098248.
    When `closeAsync` is called, the previous `sendAsync` calls might not
    complete, so all `sendAsync` will complete with `ResultAlreadyClosed`,
    not only those called after `closeAsync`.
    
    In addition, this PR also tries to fix the flaky `testReferenceCount`,
    which assumes too strictly.
    
    ### Modifications
    
    - Remove `testSendAsyncCloseAsyncConcurrentlyWithLazyProducers`
    - Only check the reference count is greater than 0 instead of equal to 1
    
    (cherry picked from commit eeea9ca1f6eeef1248b7fe8f36be30be835d2480)
---
 pulsar-client-cpp/tests/ClientTest.cc   |  2 +-
 pulsar-client-cpp/tests/ProducerTest.cc | 85 +--------------------------------
 2 files changed, 2 insertions(+), 85 deletions(-)

diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 1ba0164ad87..364e170f896 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -211,7 +211,7 @@ TEST(ClientTest, testReferenceCount) {
         LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());
 
         readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
-        ASSERT_EQ(readerWeakPtr.use_count(), 1);
+        ASSERT_TRUE(readerWeakPtr.use_count() > 0);
         LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
     }
 
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 14461429da3..559cc6f2971 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -159,89 +159,6 @@ TEST(ProducerTest, testSendAsyncAfterCloseAsyncWithLazyProducers) {
     ASSERT_EQ(ResultOk, result);
 }
 
-TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) {
-    // run sendAsync and closeAsync concurrently and verify that all sendAsync callbacks are called
-    // and that messages sent after closeAsync is invoked receive ResultAlreadyClosed.
-    for (int run = 0; run < 20; run++) {
-        LOG_INFO("Start of run " << run);
-        Client client(serviceUrl);
-        const std::string partitionedTopic =
-            "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr));
-
-        int res = makePutRequest(
-            adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10");
-        ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
-
-        ProducerConfiguration producerConfiguration;
-        producerConfiguration.setLazyStartPartitionedProducers(true);
-        producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
-        producerConfiguration.setBatchingEnabled(true);
-        Producer producer;
-        ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer));
-
-        int sendCount = 100;
-        std::vector<Promise<Result, MessageId>> promises(sendCount);
-        Promise<bool, Result> promiseClose;
-
-        // only call closeAsync once at least 10 messages have been sent
-        Latch sendStartLatch(10);
-        Latch closeLatch(1);
-        int closedAt = 0;
-
-        std::thread t1([&]() {
-            for (int i = 0; i < sendCount; i++) {
-                sendStartLatch.countdown();
-                Message msg = MessageBuilder().setContent("test").build();
-
-                if (closeLatch.getCount() == 0 && closedAt == 0) {
-                    closedAt = i;
-                    LOG_INFO("closedAt set to " << closedAt)
-                }
-
-                producer.sendAsync(msg, WaitForCallbackValue<MessageId>(promises[i]));
-                std::this_thread::sleep_for(std::chrono::milliseconds(1));
-            }
-        });
-
-        std::thread t2([&]() {
-            sendStartLatch.wait(std::chrono::milliseconds(1000));
-            LOG_INFO("Closing");
-            producer.closeAsync(WaitForCallback(promiseClose));
-            LOG_INFO("Close called");
-            closeLatch.countdown();
-            Result result;
-            promiseClose.getFuture().get(result);
-            ASSERT_EQ(ResultOk, result);
-            LOG_INFO("Closed");
-        });
-
-        t1.join();
-        t2.join();
-
-        // make sure that all messages after the moment when closeAsync was invoked
-        // return AlreadyClosed
-        for (int i = 0; i < sendCount; i++) {
-            LOG_DEBUG("Checking " << i)
-
-            // whether a message was sent successfully or not, it's callback
-            // must have been invoked
-            ASSERT_EQ(true, promises[i].isComplete());
-            MessageId mi;
-            Result res = promises[i].getFuture().get(mi);
-            LOG_DEBUG("Result is " << res);
-
-            // for the messages sent after closeAsync was invoked, they
-            // should all return ResultAlreadyClosed
-            if (i >= closedAt) {
-                ASSERT_EQ(ResultAlreadyClosed, res);
-            }
-        }
-
-        client.close();
-        LOG_INFO("End of run " << run);
-    }
-}
-
 TEST(ProducerTest, testBacklogQuotasExceeded) {
     std::string ns = "public/test-backlog-quotas";
     std::string topic = ns + "/testBacklogQuotasExceeded" + std::to_string(time(nullptr));
@@ -283,4 +200,4 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
     ASSERT_EQ(ResultOk, client.createProducer(partition, producer));
 
     client.close();
-}
\ No newline at end of file
+}