You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 14:27:18 UTC

[pulsar] 02/10: Producer getting producer busy is removing existing producer from list (#11804)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a6d94e51b8480ca31ecba9bb08d5ba0430ba6a0e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Aug 27 02:18:41 2021 -0700

    Producer getting producer busy is removing existing producer from list (#11804)
    
    ### Motivation
    When a producer is getting error because of ProducerBusy (existing producer with the same name), it will trigger a producer close operation that will eventually lead to the existing producer getting removed from the topic map (even though that producer is still writing on the topic).
    
    The problem is the producer close is triggering a removal from the map:
    pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
    
    Line 683 in 43ded59
    
     if (producers.remove(producer.getProducerName(), producer)) {
    Even though we check for producer equality, the Producer.equals() is only comparing the producer name, so the old instance gets removed from the map.
    
    Instead, the equality of producer needs to be based on the connection id (local & remote addresses and unique id), plus the producer id within that connection.
    
    * Producer getting producer busy is removing existing producer from list
    
    * Fixed test
    
    (cherry picked from commit 6aef83f3b77c343b9ea3edc1c07dbaf6bac9bd59)
---
 .../org/apache/pulsar/broker/service/Producer.java |  5 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    | 18 +++++++++
 .../broker/service/PersistentTopicE2ETest.java     | 45 ++++++++++++++++++++++
 3 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 3261e96..2995c2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -137,7 +137,10 @@ public class Producer {
     public boolean equals(Object obj) {
         if (obj instanceof Producer) {
             Producer other = (Producer) obj;
-            return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic);
+            return Objects.equals(producerName, other.producerName)
+                    && Objects.equals(topic, other.topic)
+                    && producerId == other.producerId
+                    && Objects.equals(cnx, other.cnx);
         }
 
         return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0070c92..3ba6fbd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -42,6 +42,7 @@ import java.net.SocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -1403,6 +1404,23 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ServerCnx other = (ServerCnx) o;
+        return Objects.equals(ctx().channel().id(), other.ctx().channel().id());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(ctx().channel().id());
+    }
+
+    @Override
     protected void handleCloseProducer(CommandCloseProducer closeProducer) {
         checkArgument(state == State.Connected);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 43645bf..f9c7e18 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1690,4 +1690,49 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         assertEquals(msg.getValue(), "test");
         assertEquals(msg.getEventTime(), 5);
     }
+
+    @Test
+    public void testProducerBusy() throws Exception {
+        final String topicName = "prop/ns-abc/producer-busy-" + System.nanoTime();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .producerName("xxx")
+                .create();
+
+        assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+
+        for (int i =0; i < 5; i++) {
+            try {
+                pulsarClient.newProducer(Schema.STRING)
+                        .topic(topicName)
+                        .producerName("xxx")
+                        .create();
+                fail("Should have failed");
+            } catch (ProducerBusyException e) {
+                // Expected
+            }
+
+            assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+        }
+
+        // Try from different connection
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder()
+                .serviceUrl(getPulsar().getBrokerServiceUrl())
+                .build();
+
+        try {
+            client2.newProducer(Schema.STRING)
+                    .topic(topicName)
+                    .producerName("xxx")
+                    .create();
+            fail("Should have failed");
+        } catch (ProducerBusyException e) {
+            // Expected
+        }
+
+        assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+    }
 }