You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/18 08:44:37 UTC
[pulsar] branch master updated: [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e33687d [Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)
e33687d is described below
commit e33687d3f202ab104d41ad086c48b66b6f0d5ff5
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Thu Nov 18 02:42:56 2021 -0600
[Broker] Fix producer getting incorrectly removed from topic's producers map (#12846)
---
.../pulsar/broker/service/AbstractTopic.java | 8 +--
.../org/apache/pulsar/broker/service/Producer.java | 27 ++++-----
.../pulsar/broker/service/PersistentTopicTest.java | 14 ++++-
.../pulsar/broker/service/ServerCnxTest.java | 66 ++++++++++++++++++++++
4 files changed, 91 insertions(+), 24 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f4f7615..12bf097 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -639,13 +639,9 @@ public abstract class AbstractTopic implements Topic {
private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
- boolean canOverwrite = false;
- if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer)
- && !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) {
+ if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer)
+ && !isUserProvidedProducerName(newProducer)) {
oldProducer.close(false);
- canOverwrite = true;
- }
- if (canOverwrite) {
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 0514542..d72f904 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -153,22 +153,17 @@ public class Producer {
return null;
}
- @Override
- public int hashCode() {
- return Objects.hash(producerName);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof Producer) {
- Producer other = (Producer) obj;
- return Objects.equals(producerName, other.producerName)
- && Objects.equals(topic, other.topic)
- && producerId == other.producerId
- && Objects.equals(cnx, other.cnx);
- }
-
- return false;
+ /**
+ * Method to determine if this producer can replace another producer.
+ * @param other - producer to compare to this one
+ * @return true if this producer is a subsequent instantiation of the same logical producer. Otherwise, false.
+ */
+ public boolean isSuccessorTo(Producer other) {
+ return Objects.equals(producerName, other.producerName)
+ && Objects.equals(topic, other.topic)
+ && producerId == other.producerId
+ && Objects.equals(cnx, other.cnx)
+ && other.getEpoch() < epoch;
}
public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 5fe651b..b8cbe0b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -442,11 +443,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
// OK
}
- // 4. simple remove producer
+ // 4. Try to remove with unequal producer
+ Producer producerCopy = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+ role, false, null, SchemaVersion.Latest, 0, false,
+ ProducerAccessMode.Shared, Optional.empty());
+ topic.removeProducer(producerCopy);
+ // Expect producer to be in map
+ assertEquals(topic.getProducers().size(), 1);
+ assertSame(topic.getProducers().get(producer.getProducerName()), producer);
+
+ // 5. simple remove producer
topic.removeProducer(producer);
assertEquals(topic.getProducers().size(), 0);
- // 5. duplicate remove
+ // 6. duplicate remove
topic.removeProducer(producer); /* noop */
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 80be3ef..4fe14d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -837,6 +837,72 @@ public class ServerCnxTest {
channel.finish();
}
+ @Test(timeOut = 30000)
+ public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception {
+ resetChannel();
+ setChannelConnected();
+
+ // Delay the topic creation in a deterministic way
+ CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>();
+ doAnswer(invocationOnMock -> {
+ openTopicFuture.complete(() -> {
+ ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+ });
+ return null;
+ }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class), any());
+
+ // In a create producer timeout from client side we expect to see this sequence of commands :
+ // 1. create producer
+ // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
+ // 3. create producer (triggered by reconnection logic)
+ // Then, when another producer is created with the same name, it should fail. Because we only have one
+ // channel here, we just use a different producer id
+
+ // These operations need to be serialized, to allow the last create producer to finally succeed
+ // (There can be more create/close pairs in the sequence, depending on the client timeout
+
+ String producerName = "my-producer";
+
+ ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(createProducer1);
+
+ ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
+ channel.writeInbound(closeProducer);
+
+ ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(createProducer2);
+
+ // Complete the topic opening: It will make 2nd producer creation successful
+ openTopicFuture.get().run();
+
+ // Close succeeds
+ Object response = getResponse();
+ assertEquals(response.getClass(), CommandSuccess.class);
+ assertEquals(((CommandSuccess) response).getRequestId(), 2);
+
+ // 2nd producer will be successfully created as topic is open by then
+ response = getResponse();
+ assertEquals(response.getClass(), CommandProducerSuccess.class);
+ assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);
+
+ // Send create command after getting the CommandProducerSuccess to ensure correct ordering
+ ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */,
+ producerName, Collections.emptyMap(), false);
+ channel.writeInbound(createProducer3);
+
+ // 3nd producer will fail
+ response = getResponse();
+ assertEquals(response.getClass(), CommandError.class);
+ assertEquals(((CommandError) response).getRequestId(), 4);
+
+ assertTrue(channel.isActive());
+
+ channel.finish();
+ }
+
@Test(timeOut = 30000, enabled = false)
public void testCreateProducerMultipleTimeouts() throws Exception {
resetChannel();