You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/18 08:44:37 UTC

[pulsar] branch master updated: [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e33687d  [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)
e33687d is described below

commit e33687d3f202ab104d41ad086c48b66b6f0d5ff5
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Thu Nov 18 02:42:56 2021 -0600

    [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)
---
 .../pulsar/broker/service/AbstractTopic.java       |  8 +--
 .../org/apache/pulsar/broker/service/Producer.java | 27 ++++-----
 .../pulsar/broker/service/PersistentTopicTest.java | 14 ++++-
 .../pulsar/broker/service/ServerCnxTest.java       | 66 ++++++++++++++++++++++
 4 files changed, 91 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f4f7615..12bf097 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -639,13 +639,9 @@ public abstract class AbstractTopic implements Topic {
 
     private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
             throws BrokerServiceException {
-        boolean canOverwrite = false;
-        if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer)
-                && !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) {
+        if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer)
+                && !isUserProvidedProducerName(newProducer)) {
             oldProducer.close(false);
-            canOverwrite = true;
-        }
-        if (canOverwrite) {
             if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
                 // Met concurrent update, throw exception here so that client can try reconnect later.
                 throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 0514542..d72f904 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -153,22 +153,17 @@ public class Producer {
         return null;
     }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(producerName);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof Producer) {
-            Producer other = (Producer) obj;
-            return Objects.equals(producerName, other.producerName)
-                    && Objects.equals(topic, other.topic)
-                    && producerId == other.producerId
-                    && Objects.equals(cnx, other.cnx);
-        }
-
-        return false;
+    /**
+     * Method to determine if this producer can replace another producer.
+     * @param other - producer to compare to this one
+     * @return true if this producer is a subsequent instantiation of the same logical producer. Otherwise, false.
+     */
+    public boolean isSuccessorTo(Producer other) {
+        return Objects.equals(producerName, other.producerName)
+                && Objects.equals(topic, other.topic)
+                && producerId == other.producerId
+                && Objects.equals(cnx, other.cnx)
+                && other.getEpoch() < epoch;
     }
 
     public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 5fe651b..b8cbe0b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -442,11 +443,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
             // OK
         }
 
-        // 4. simple remove producer
+        // 4. Try to remove with unequal producer
+        Producer producerCopy = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+                role, false, null, SchemaVersion.Latest, 0, false,
+                ProducerAccessMode.Shared, Optional.empty());
+        topic.removeProducer(producerCopy);
+        // Expect producer to be in map
+        assertEquals(topic.getProducers().size(), 1);
+        assertSame(topic.getProducers().get(producer.getProducerName()), producer);
+
+        // 5. simple remove producer
         topic.removeProducer(producer);
         assertEquals(topic.getProducers().size(), 0);
 
-        // 5. duplicate remove
+        // 6. duplicate remove
         topic.removeProducer(producer); /* noop */
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 80be3ef..4fe14d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -837,6 +837,72 @@ public class ServerCnxTest {
         channel.finish();
     }
 
+    @Test(timeOut = 30000)
+    public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception {
+        resetChannel();
+        setChannelConnected();
+
+        // Delay the topic creation in a deterministic way
+        CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>();
+        doAnswer(invocationOnMock -> {
+            openTopicFuture.complete(() -> {
+                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+            });
+            return null;
+        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
+                any(OpenLedgerCallback.class), any(Supplier.class), any());
+
+        // In a create producer timeout from client side we expect to see this sequence of commands :
+        // 1. create producer
+        // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
+        // 3. create producer (triggered by reconnection logic)
+        // Then, when another producer is created with the same name, it should fail. Because we only have one
+        // channel here, we just use a different producer id
+
+        // These operations need to be serialized, to allow the last create producer to finally succeed
+        // (There can be more create/close pairs in the sequence, depending on the client timeout
+
+        String producerName = "my-producer";
+
+        ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(createProducer1);
+
+        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
+        channel.writeInbound(closeProducer);
+
+        ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(createProducer2);
+
+        // Complete the topic opening: It will make 2nd producer creation successful
+        openTopicFuture.get().run();
+
+        // Close succeeds
+        Object response = getResponse();
+        assertEquals(response.getClass(), CommandSuccess.class);
+        assertEquals(((CommandSuccess) response).getRequestId(), 2);
+
+        // 2nd producer will be successfully created as topic is open by then
+        response = getResponse();
+        assertEquals(response.getClass(), CommandProducerSuccess.class);
+        assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);
+
+        // Send create command after getting the CommandProducerSuccess to ensure correct ordering
+        ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */,
+                producerName, Collections.emptyMap(), false);
+        channel.writeInbound(createProducer3);
+
+        // 3nd producer will fail
+        response = getResponse();
+        assertEquals(response.getClass(), CommandError.class);
+        assertEquals(((CommandError) response).getRequestId(), 4);
+
+        assertTrue(channel.isActive());
+
+        channel.finish();
+    }
+
     @Test(timeOut = 30000, enabled = false)
     public void testCreateProducerMultipleTimeouts() throws Exception {
         resetChannel();