You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/05/20 06:30:34 UTC
[pulsar] branch master updated: PIP-161 Exclusive Producer: ability to fence out an existing Producer (ExclusiveWithFencing mode) (#15488)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bde3972fa48 PIP-161 Exclusive Producer: ability to fence out an existing Producer (ExclusiveWithFencing mode) (#15488)
bde3972fa48 is described below
commit bde3972fa4890f1e5c4b28472802011b26472081
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri May 20 08:30:26 2022 +0200
PIP-161 Exclusive Producer: ability to fence out an existing Producer (ExclusiveWithFencing mode) (#15488)
---
.../pulsar/broker/service/AbstractTopic.java | 51 ++++++++++-
.../broker/service/ExclusiveProducerTest.java | 102 ++++++++++++++++++++-
.../pulsar/client/api/ProducerAccessMode.java | 6 ++
.../apache/pulsar/client/api/ProducerBuilder.java | 2 +
.../apache/pulsar/common/protocol/Commands.java | 4 +
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
site2/docs/concepts-messaging.md | 1 +
7 files changed, 164 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 02c2c3d2f44..ad512949168 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
@@ -738,7 +739,55 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
return topicEpoch;
});
}
-
+ case ExclusiveWithFencing:
+ if (hasExclusiveProducer || !producers.isEmpty()) {
+ // clear all waiting producers
+ // otherwise closing any producer will trigger the promotion
+ // of the next pending producer
+ List<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducersCopy =
+ new ArrayList<>(waitingExclusiveProducers);
+ waitingExclusiveProducers.clear();
+ waitingExclusiveProducersCopy.forEach((Pair<Producer,
+ CompletableFuture<Optional<Long>>> handle) -> {
+ log.info("[{}] Failing waiting producer {}", topic, handle.getKey());
+ handle.getValue().completeExceptionally(new ProducerFencedException("Fenced out"));
+ handle.getKey().close(true);
+ });
+ producers.forEach((k, currentProducer) -> {
+ log.info("[{}] Fencing out producer {}", topic, currentProducer);
+ currentProducer.close(true);
+ });
+ }
+ if (producer.getTopicEpoch().isPresent()
+ && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
+ // If a producer reconnects, but all the topic epoch has already moved forward,
+ // this producer needs to be fenced, because a new producer had been present in between.
+ hasExclusiveProducer = false;
+ return FutureUtil.failedFuture(new ProducerFencedException(
+ String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
+ topicEpoch.get(), producer.getTopicEpoch().get())));
+ } else {
+ // There are currently no existing producers
+ hasExclusiveProducer = true;
+ exclusiveProducerName = producer.getProducerName();
+
+ CompletableFuture<Long> future;
+ if (producer.getTopicEpoch().isPresent()) {
+ future = setTopicEpoch(producer.getTopicEpoch().get());
+ } else {
+ future = incrementTopicEpoch(topicEpoch);
+ }
+ future.exceptionally(ex -> {
+ hasExclusiveProducer = false;
+ exclusiveProducerName = null;
+ return null;
+ });
+
+ return future.thenApply(epoch -> {
+ topicEpoch = Optional.of(epoch);
+ return topicEpoch;
+ });
+ }
case WaitForExclusive: {
if (hasExclusiveProducer || !producers.isEmpty()) {
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 4d2b16a6a0e..604abd8d709 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -19,10 +19,12 @@
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import io.netty.util.HashedWheelTimer;
@@ -73,8 +75,10 @@ public class ExclusiveProducerTest extends BrokerTestBase {
// ProducerAccessMode, partitioned
{ ProducerAccessMode.Exclusive, Boolean.TRUE},
{ ProducerAccessMode.Exclusive, Boolean.FALSE },
+ { ProducerAccessMode.ExclusiveWithFencing, Boolean.TRUE},
+ { ProducerAccessMode.ExclusiveWithFencing, Boolean.FALSE },
{ ProducerAccessMode.WaitForExclusive, Boolean.TRUE },
- { ProducerAccessMode.WaitForExclusive, Boolean.FALSE },
+ { ProducerAccessMode.WaitForExclusive, Boolean.FALSE }
};
}
@@ -88,12 +92,14 @@ public class ExclusiveProducerTest extends BrokerTestBase {
Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
+ .producerName("p1")
.accessMode(ProducerAccessMode.Exclusive)
.create();
try {
pulsarClient.newProducer(Schema.STRING)
.topic(topic)
+ .producerName("p-fail-1")
.accessMode(ProducerAccessMode.Exclusive)
.create();
fail("Should have failed");
@@ -104,6 +110,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
try {
pulsarClient.newProducer(Schema.STRING)
.topic(topic)
+ .producerName("p-fail-2")
.accessMode(ProducerAccessMode.Shared)
.create();
fail("Should have failed");
@@ -116,9 +123,100 @@ public class ExclusiveProducerTest extends BrokerTestBase {
// Now producer should be allowed to get in
Producer<String> p2 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
+ .producerName("p2")
.accessMode(ProducerAccessMode.Exclusive)
.create();
- p2.close();
+
+ Producer<String> p3 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .producerName("p3")
+ .accessMode(ProducerAccessMode.ExclusiveWithFencing)
+ .create();
+
+ try {
+ p2.send("test");
+ fail("Should have failed");
+ } catch (ProducerFencedException expected) {
+ }
+
+ // this should work
+ p3.send("test");
+ p3.close();
+
+ // test now WaitForExclusive vs ExclusiveWithFencing
+
+ // use two different Clients, because sometimes fencing a Producer triggers connection close
+ // making the test unreliable.
+
+ @Cleanup
+ PulsarClient pulsarClient2 = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .operationTimeout(2, TimeUnit.SECONDS)
+ .build();
+
+ Producer<String> p4 = pulsarClient2.newProducer(Schema.STRING)
+ .topic(topic)
+ .producerName("p4")
+ .accessMode(ProducerAccessMode.Exclusive)
+ .create();
+
+ p4.send("test");
+
+ // p5 will be waiting for the lock to be released
+ CompletableFuture<Producer<String>> p5 = pulsarClient2.newProducer(Schema.STRING)
+ .topic(topic)
+ .producerName("p5")
+ .accessMode(ProducerAccessMode.WaitForExclusive)
+ .createAsync();
+
+ // wait for all the Producers to be enqueued in order to prevent races
+ Thread.sleep(2000);
+
+ // p6 fences out all the current producers, even p5
+ Producer<String> p6 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .producerName("p6")
+ .accessMode(ProducerAccessMode.ExclusiveWithFencing)
+ .create();
+
+ p6.send("test");
+
+ // p7 is enqueued after p6
+ CompletableFuture<Producer<String>> p7 = pulsarClient2.newProducer(Schema.STRING)
+ .topic(topic)
+ .producerName("p7")
+ .accessMode(ProducerAccessMode.WaitForExclusive)
+ .createAsync();
+
+ // this should work, p6 is the owner
+ p6.send("test");
+
+ try {
+ p4.send("test");
+ fail("Should have failed");
+ } catch (ProducerFencedException expected) {
+ }
+
+ // this should work, p6 is the owner
+ p6.send("test");
+
+ // p5 fails
+ try {
+ p5.get();
+ fail("Should have failed");
+ } catch (ExecutionException expected) {
+ assertTrue(expected.getCause() instanceof ProducerFencedException,
+ "unexpected exception " + expected.getCause());
+ }
+
+ // this should work, p6 is the owner
+ p6.send("test");
+
+ p6.close();
+
+ // p7 finally acquires the lock
+ p7.get().send("test");
+ p7.get().close();
}
@Test(dataProvider = "topics")
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
index 85199e7cf4c..33492802d16 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
@@ -33,6 +33,12 @@ public enum ProducerAccessMode {
*/
Exclusive,
+ /**
+ * Acquire exclusive access for the producer. Any existing producer will be removed and
+ * invalidated immediately.
+ */
+ ExclusiveWithFencing,
+
/**
* Producer creation is pending until it can acquire exclusive access.
*/
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index cab4d497307..d1778685f84 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -133,6 +133,8 @@ public interface ProducerBuilder<T> extends Cloneable {
* <li>{@link ProducerAccessMode#Shared}: By default multiple producers can publish on a topic
* <li>{@link ProducerAccessMode#Exclusive}: Require exclusive access for producer. Fail immediately if there's
* already a producer connected.
+ * <li>{@link ProducerAccessMode#ExclusiveWithFencing}: Require exclusive access for the producer.
+ * Any existing producer will be removed and invalidated immediately.
* <li>{@link ProducerAccessMode#WaitForExclusive}: Producer creation is pending until it can acquire exclusive
* access
* </ul>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d1cbe3ad961..2d8e043058d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1853,6 +1853,8 @@ public class Commands {
return org.apache.pulsar.common.api.proto.ProducerAccessMode.Shared;
case WaitForExclusive:
return org.apache.pulsar.common.api.proto.ProducerAccessMode.WaitForExclusive;
+ case ExclusiveWithFencing:
+ return org.apache.pulsar.common.api.proto.ProducerAccessMode.ExclusiveWithFencing;
default:
throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
}
@@ -1867,6 +1869,8 @@ public class Commands {
return ProducerAccessMode.Shared;
case WaitForExclusive:
return ProducerAccessMode.WaitForExclusive;
+ case ExclusiveWithFencing:
+ return ProducerAccessMode.ExclusiveWithFencing;
default:
throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index a5d97e51acf..a2bfd76c92c 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -99,6 +99,7 @@ enum ProducerAccessMode {
Shared = 0; // By default multiple producers can publish on a topic
Exclusive = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected.
WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access
+ ExclusiveWithFencing = 3; // Require exclusive access for producer. Fence out old producer.
}
message MessageMetadata {
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index f7df3250762..b3520d70e8b 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -76,6 +76,7 @@ Access mode | Description
:-----------|------------
`Shared` | Multiple producers can publish on a topic. <br /><br />This is the **default** setting.
`Exclusive` | Only one producer can publish on a topic. <br /><br />If there is already a producer connected, other producers trying to publish on this topic get errors immediately.<br /><br />The “old” producer is evicted and a “new” producer is selected to be the next exclusive producer if the “old” producer experiences a network partition with the broker.
+`ExclusiveWithFencing`|Only one producer can publish on a topic. <br /><br />If there is already a producer connected, it will be removed and invalidated immediately.
`WaitForExclusive` | If there is already a producer connected, the producer creation is pending (rather than timing out) until the producer gets the `Exclusive` access.<br /><br />The producer that succeeds in becoming the exclusive one is treated as the leader. Consequently, if you want to implement a leader election scheme for your application, you can use this access mode. Note that the leader pattern scheme mentioned refers to using Pulsar as a Write-Ahead Log (WAL) which means the l [...]
:::note