You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 11:52:38 UTC
[pulsar] 02/02: [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 593c2da8fd4bb8b8838ba9bb1d93c978baccd9ad
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Thu Nov 18 02:42:56 2021 -0600
[Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)
(cherry picked from commit e33687d3f202ab104d41ad086c48b66b6f0d5ff5)
---
.../pulsar/broker/service/AbstractTopic.java | 8 +--
.../org/apache/pulsar/broker/service/Producer.java | 27 ++++-----
.../pulsar/broker/service/PersistentTopicTest.java | 14 ++++-
.../pulsar/broker/service/ServerCnxTest.java | 66 ++++++++++++++++++++++
4 files changed, 91 insertions(+), 24 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 951d602..9463be9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -631,13 +631,9 @@ public abstract class AbstractTopic implements Topic {
private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
- boolean canOverwrite = false;
- if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer)
- && !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) {
+ if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer)
+ && !isUserProvidedProducerName(newProducer)) {
oldProducer.close(false);
- canOverwrite = true;
- }
- if (canOverwrite) {
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index d13e2f9..64e0fd6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -138,22 +138,17 @@ public class Producer {
this.clientAddress = cnx.clientSourceAddress();
}
- @Override
- public int hashCode() {
- return Objects.hash(producerName);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof Producer) {
- Producer other = (Producer) obj;
- return Objects.equals(producerName, other.producerName)
- && Objects.equals(topic, other.topic)
- && producerId == other.producerId
- && Objects.equals(cnx, other.cnx);
- }
-
- return false;
+ /**
+ * Method to determine if this producer can replace another producer.
+ * @param other - producer to compare to this one
+ * @return true if this producer is a subsequent instantiation of the same logical producer. Otherwise, false.
+ */
+ public boolean isSuccessorTo(Producer other) {
+ return Objects.equals(producerName, other.producerName)
+ && Objects.equals(topic, other.topic)
+ && producerId == other.producerId
+ && Objects.equals(cnx, other.cnx)
+ && other.getEpoch() < epoch;
}
public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 543d897..cb10d72 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -36,6 +36,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -440,11 +441,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
// OK
}
- // 4. simple remove producer
+ // 4. Try to remove with unequal producer
+ Producer producerCopy = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+ role, false, null, SchemaVersion.Latest, 0, false,
+ ProducerAccessMode.Shared, Optional.empty());
+ topic.removeProducer(producerCopy);
+ // Expect producer to be in map
+ assertEquals(topic.getProducers().size(), 1);
+ assertSame(topic.getProducers().get(producer.getProducerName()), producer);
+
+ // 5. simple remove producer
topic.removeProducer(producer);
assertEquals(topic.getProducers().size(), 0);
- // 5. duplicate remove
+ // 6. duplicate remove
topic.removeProducer(producer); /* noop */
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 3d3a30e..8881ce9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -838,6 +838,72 @@ public class ServerCnxTest {
channel.finish();
}
+ @Test(timeOut = 30000)
+ public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception {
+ resetChannel();
+ setChannelConnected();
+
+ // Delay the topic creation in a deterministic way
+ CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>();
+ doAnswer(invocationOnMock -> {
+ openTopicFuture.complete(() -> {
+ ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+ });
+ return null;
+ }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class), any());
+
+ // In a create producer timeout from client side we expect to see this sequence of commands :
+ // 1. create producer
+ // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
+ // 3. create producer (triggered by reconnection logic)
+ // Then, when another producer is created with the same name, it should fail. Because we only have one
+ // channel here, we just use a different producer id
+
+ // These operations need to be serialized, to allow the last create producer to finally succeed
+ // (There can be more create/close pairs in the sequence, depending on the client timeout
+
+ String producerName = "my-producer";
+
+ ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(createProducer1);
+
+ ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
+ channel.writeInbound(closeProducer);
+
+ ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(createProducer2);
+
+ // Complete the topic opening: It will make 2nd producer creation successful
+ openTopicFuture.get().run();
+
+ // Close succeeds
+ Object response = getResponse();
+ assertEquals(response.getClass(), CommandSuccess.class);
+ assertEquals(((CommandSuccess) response).getRequestId(), 2);
+
+ // 2nd producer will be successfully created as topic is open by then
+ response = getResponse();
+ assertEquals(response.getClass(), CommandProducerSuccess.class);
+ assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);
+
+ // Send create command after getting the CommandProducerSuccess to ensure correct ordering
+ ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(createProducer3);
+
+ // 3nd producer will fail
+ response = getResponse();
+ assertEquals(response.getClass(), CommandError.class);
+ assertEquals(((CommandError) response).getRequestId(), 4);
+
+ assertTrue(channel.isActive());
+
+ channel.finish();
+ }
+
@Test(timeOut = 30000, enabled = false)
public void testCreateProducerMultipleTimeouts() throws Exception {
resetChannel();