You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 11:52:36 UTC

[pulsar] branch branch-2.9 updated (b87bd78 -> 593c2da)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from b87bd78  [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820)
     new 15b2831  [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order (#12456)
     new 593c2da  [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/broker/service/AbstractTopic.java       |  8 +--
 .../org/apache/pulsar/broker/service/Producer.java | 27 ++++----
 .../pulsar/broker/service/PersistentTopicTest.java | 14 +++-
 .../pulsar/broker/service/ServerCnxTest.java       | 66 +++++++++++++++++++
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 75 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 .../client/impl/MultiTopicsConsumerImpl.java       | 37 ++++++-----
 .../client/impl/MultiTopicsConsumerImplTest.java   |  3 +-
 8 files changed, 189 insertions(+), 44 deletions(-)

[pulsar] 01/02: [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order (#12456)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 15b2831c33f74b38a430c80ca9b2825b7aa0da26
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Mon Oct 25 16:31:59 2021 -0500

    [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order (#12456)
    
    * [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order
    
    * Fix test
    
    * Return the checkState method call to keep original behavior
    
    * Reproduce out-of-order delivery issue in PR 12456
    
    * Remove unnecessary scheduling of receiveMessageFromConsumer
    
    Co-authored-by: Lari Hotari <lh...@apache.org>
    (cherry picked from commit 6a2e3a1ad735465154dc3fa12988c3068eae7da5)
---
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 75 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 .../client/impl/MultiTopicsConsumerImpl.java       | 37 ++++++-----
 .../client/impl/MultiTopicsConsumerImplTest.java   |  3 +-
 4 files changed, 98 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index 715f3ad..d8c8bd6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.client.api;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -24,9 +25,16 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import com.google.common.collect.Lists;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -34,6 +42,7 @@ import org.mockito.AdditionalAnswers;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -70,6 +79,7 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase {
                         // method calls on the interface.
                         Mockito.withSettings().defaultAnswer(AdditionalAnswers.delegatesTo(internalExecutorService)));
             }
+
             @Override
             public ExecutorService getInternalExecutorService() {
                 return internalExecutorServiceDelegate;
@@ -119,4 +129,69 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase {
         verify(internalExecutorServiceDelegate, times(0))
                 .schedule(any(Runnable.class), anyLong(), any());
     }
+
+    // test that reproduces the issue that PR https://github.com/apache/pulsar/pull/12456 fixes
+    // where MultiTopicsConsumerImpl has a data race that causes out-of-order delivery of messages
+    @Test
+    public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
+            throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException,
+            TimeoutException {
+        String topicName = newTopicName();
+        int numPartitions = 2;
+        int numMessages = 100000;
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+        Producer<Long>[] producers = new Producer[numPartitions];
+
+        for (int i = 0; i < numPartitions; i++) {
+            producers[i] = pulsarClient.newProducer(Schema.INT64)
+                    // produce to each partition directly so that order can be maintained in sending
+                    .topic(topicName + "-partition-" + i)
+                    .enableBatching(true)
+                    .maxPendingMessages(30000)
+                    .maxPendingMessagesAcrossPartitions(60000)
+                    .batchingMaxMessages(10000)
+                    .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+                    .batchingMaxBytes(4 * 1024 * 1024)
+                    .blockIfQueueFull(true)
+                    .create();
+        }
+
+        @Cleanup
+        Consumer<Long> consumer = pulsarClient
+                .newConsumer(Schema.INT64)
+                // consume on the partitioned topic
+                .topic(topicName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .receiverQueueSize(numMessages)
+                .subscriptionName(methodName)
+                .subscribe();
+
+        // produce sequence numbers to each partition topic
+        long sequenceNumber = 1L;
+        for (int i = 0; i < numMessages; i++) {
+            for (Producer<Long> producer : producers) {
+                producer.newMessage()
+                        .value(sequenceNumber)
+                        .sendAsync();
+            }
+            sequenceNumber++;
+        }
+        for (Producer<Long> producer : producers) {
+            producer.close();
+        }
+
+        // receive and validate sequences in the partitioned topic
+        Map<String, AtomicLong> receivedSequences = new HashMap<>();
+        int receivedCount = 0;
+        while (receivedCount < numPartitions * numMessages) {
+            Message<Long> message = consumer.receiveAsync().get(5, TimeUnit.SECONDS);
+            consumer.acknowledge(message);
+            receivedCount++;
+            AtomicLong receivedSequenceCounter =
+                    receivedSequences.computeIfAbsent(message.getTopicName(), k -> new AtomicLong(1L));
+            Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement());
+        }
+        Assert.assertEquals(numPartitions * numMessages, receivedCount);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index acc7cea..7e42d0d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -427,8 +427,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             if (message == null) {
                 pendingReceives.add(result);
                 cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
-            }
-            if (message != null) {
+            } else {
                 messageProcessed(message);
                 result.complete(beforeConsume(message));
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 18bd7bc..2c356fb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -245,7 +245,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
-        consumer.receiveAsync().thenAccept(message -> {
+        consumer.receiveAsync().thenAcceptAsync(message -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Receive message from sub consumer:{}",
                     topic, subscription, consumer.getTopic());
@@ -260,16 +260,16 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 // or if any consumer is already paused (to create fair chance for already paused consumers)
                 pausedConsumers.add(consumer);
 
-                // Since we din't get a mutex, the condition on the incoming queue might have changed after
+                // Since we didn't get a mutex, the condition on the incoming queue might have changed after
                 // we have paused the current consumer. We need to re-check in order to avoid this consumer
                 // from getting stalled.
                 resumeReceivingFromPausedConsumersIfNeeded();
             } else {
-                // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
-                // recursion and stack overflow
-                internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
+                // Call receiveAsync() if the incoming queue is not full. Because this block is run with
+                // thenAcceptAsync, there is no chance for recursion that would lead to stack overflow.
+                receiveMessageFromConsumer(consumer);
             }
-        }).exceptionally(ex -> {
+        }, internalPinnedExecutor).exceptionally(ex -> {
             if (ex instanceof PulsarClientException.AlreadyClosedException
                     || ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                 // ignore the exception that happens when the consumer is closed
@@ -281,6 +281,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         });
     }
 
+    // Must be called from the internalPinnedExecutor thread
     private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
         checkArgument(message instanceof MessageImpl);
         TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(),
@@ -409,17 +410,19 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
-        Message<T> message = incomingMessages.poll();
-        if (message == null) {
-            pendingReceives.add(result);
-            cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
-        } else {
-            decreaseIncomingMessageSize(message);
-            checkState(message instanceof TopicMessageImpl);
-            unAckedMessageTracker.add(message.getMessageId());
-            resumeReceivingFromPausedConsumersIfNeeded();
-            result.complete(message);
-        }
+        internalPinnedExecutor.execute(() -> {
+            Message<T> message = incomingMessages.poll();
+            if (message == null) {
+                pendingReceives.add(result);
+                cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
+            } else {
+                decreaseIncomingMessageSize(message);
+                checkState(message instanceof TopicMessageImpl);
+                unAckedMessageTracker.add(message.getMessageId());
+                resumeReceivingFromPausedConsumersIfNeeded();
+                result.complete(message);
+            }
+        });
         return result;
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 6af8914..faa621c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.testng.annotations.AfterMethod;
@@ -165,7 +166,7 @@ public class MultiTopicsConsumerImplTest {
         // given
         MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
         CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
-        assertTrue(consumer.hasNextPendingReceive());
+        Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasNextPendingReceive()));
         // when
         future.cancel(true);
         // then

[pulsar] 02/02: [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 593c2da8fd4bb8b8838ba9bb1d93c978baccd9ad
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Thu Nov 18 02:42:56 2021 -0600

    [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)
    
    (cherry picked from commit e33687d3f202ab104d41ad086c48b66b6f0d5ff5)
---
 .../pulsar/broker/service/AbstractTopic.java       |  8 +--
 .../org/apache/pulsar/broker/service/Producer.java | 27 ++++-----
 .../pulsar/broker/service/PersistentTopicTest.java | 14 ++++-
 .../pulsar/broker/service/ServerCnxTest.java       | 66 ++++++++++++++++++++++
 4 files changed, 91 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 951d602..9463be9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -631,13 +631,9 @@ public abstract class AbstractTopic implements Topic {
 
     private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
             throws BrokerServiceException {
-        boolean canOverwrite = false;
-        if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer)
-                && !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) {
+        if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer)
+                && !isUserProvidedProducerName(newProducer)) {
             oldProducer.close(false);
-            canOverwrite = true;
-        }
-        if (canOverwrite) {
             if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
                 // Met concurrent update, throw exception here so that client can try reconnect later.
                 throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index d13e2f9..64e0fd6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -138,22 +138,17 @@ public class Producer {
         this.clientAddress = cnx.clientSourceAddress();
     }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(producerName);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof Producer) {
-            Producer other = (Producer) obj;
-            return Objects.equals(producerName, other.producerName)
-                    && Objects.equals(topic, other.topic)
-                    && producerId == other.producerId
-                    && Objects.equals(cnx, other.cnx);
-        }
-
-        return false;
+    /**
+     * Method to determine if this producer can replace another producer.
+     * @param other - producer to compare to this one
+     * @return true if this producer is a subsequent instantiation of the same logical producer. Otherwise, false.
+     */
+    public boolean isSuccessorTo(Producer other) {
+        return Objects.equals(producerName, other.producerName)
+                && Objects.equals(topic, other.topic)
+                && producerId == other.producerId
+                && Objects.equals(cnx, other.cnx)
+                && other.getEpoch() < epoch;
     }
 
     public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 543d897..cb10d72 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -36,6 +36,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -440,11 +441,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
             // OK
         }
 
-        // 4. simple remove producer
+        // 4. Try to remove with unequal producer
+        Producer producerCopy = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+                role, false, null, SchemaVersion.Latest, 0, false,
+                ProducerAccessMode.Shared, Optional.empty());
+        topic.removeProducer(producerCopy);
+        // Expect producer to be in map
+        assertEquals(topic.getProducers().size(), 1);
+        assertSame(topic.getProducers().get(producer.getProducerName()), producer);
+
+        // 5. simple remove producer
         topic.removeProducer(producer);
         assertEquals(topic.getProducers().size(), 0);
 
-        // 5. duplicate remove
+        // 6. duplicate remove
         topic.removeProducer(producer); /* noop */
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 3d3a30e..8881ce9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -838,6 +838,72 @@ public class ServerCnxTest {
         channel.finish();
     }
 
+    @Test(timeOut = 30000)
+    public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception {
+        resetChannel();
+        setChannelConnected();
+
+        // Delay the topic creation in a deterministic way
+        CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>();
+        doAnswer(invocationOnMock -> {
+            openTopicFuture.complete(() -> {
+                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+            });
+            return null;
+        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
+                any(OpenLedgerCallback.class), any(Supplier.class), any());
+
+        // In a create producer timeout from client side we expect to see this sequence of commands :
+        // 1. create producer
+        // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
+        // 3. create producer (triggered by reconnection logic)
+        // Then, when another producer is created with the same name, it should fail. Because we only have one
+        // channel here, we just use a different producer id
+
+        // These operations need to be serialized, to allow the last create producer to finally succeed
+        // (There can be more create/close pairs in the sequence, depending on the client timeout
+
+        String producerName = "my-producer";
+
+        ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(createProducer1);
+
+        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
+        channel.writeInbound(closeProducer);
+
+        ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(createProducer2);
+
+        // Complete the topic opening: It will make 2nd producer creation successful
+        openTopicFuture.get().run();
+
+        // Close succeeds
+        Object response = getResponse();
+        assertEquals(response.getClass(), CommandSuccess.class);
+        assertEquals(((CommandSuccess) response).getRequestId(), 2);
+
+        // 2nd producer will be successfully created as topic is open by then
+        response = getResponse();
+        assertEquals(response.getClass(), CommandProducerSuccess.class);
+        assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);
+
+        // Send create command after getting the CommandProducerSuccess to ensure correct ordering
+        ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(createProducer3);
+
+        // 3nd producer will fail
+        response = getResponse();
+        assertEquals(response.getClass(), CommandError.class);
+        assertEquals(((CommandError) response).getRequestId(), 4);
+
+        assertTrue(channel.isActive());
+
+        channel.finish();
+    }
+
     @Test(timeOut = 30000, enabled = false)
     public void testCreateProducerMultipleTimeouts() throws Exception {
         resetChannel();