You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/05/20 06:30:34 UTC

[pulsar] branch master updated: PIP-161 Exclusive Producer: ability to fence out an existing Producer (ExclusiveWithFencing mode) (#15488)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bde3972fa48 PIP-161 Exclusive Producer: ability to fence out an existing Producer (ExclusiveWithFencing mode) (#15488)
bde3972fa48 is described below

commit bde3972fa4890f1e5c4b28472802011b26472081
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri May 20 08:30:26 2022 +0200

    PIP-161 Exclusive Producer: ability to fence out an existing Producer (ExclusiveWithFencing mode) (#15488)
---
 .../pulsar/broker/service/AbstractTopic.java       |  51 ++++++++++-
 .../broker/service/ExclusiveProducerTest.java      | 102 ++++++++++++++++++++-
 .../pulsar/client/api/ProducerAccessMode.java      |   6 ++
 .../apache/pulsar/client/api/ProducerBuilder.java  |   2 +
 .../apache/pulsar/common/protocol/Commands.java    |   4 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   1 +
 site2/docs/concepts-messaging.md                   |   1 +
 7 files changed, 164 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 02c2c3d2f44..ad512949168 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -738,7 +739,55 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
                         return topicEpoch;
                     });
                 }
-
+                case ExclusiveWithFencing:
+                    if (hasExclusiveProducer || !producers.isEmpty()) {
+                        // clear all waiting producers
+                        // otherwise closing any producer will trigger the promotion
+                        // of the next pending producer
+                        List<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducersCopy =
+                                new ArrayList<>(waitingExclusiveProducers);
+                        waitingExclusiveProducers.clear();
+                        waitingExclusiveProducersCopy.forEach((Pair<Producer,
+                                                               CompletableFuture<Optional<Long>>> handle) -> {
+                            log.info("[{}] Failing waiting producer {}", topic, handle.getKey());
+                            handle.getValue().completeExceptionally(new ProducerFencedException("Fenced out"));
+                            handle.getKey().close(true);
+                        });
+                        producers.forEach((k, currentProducer) -> {
+                            log.info("[{}] Fencing out producer {}", topic, currentProducer);
+                            currentProducer.close(true);
+                        });
+                    }
+                    if (producer.getTopicEpoch().isPresent()
+                            && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
+                        // If a producer reconnects, but all the topic epoch has already moved forward,
+                        // this producer needs to be fenced, because a new producer had been present in between.
+                        hasExclusiveProducer = false;
+                        return FutureUtil.failedFuture(new ProducerFencedException(
+                                String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
+                                        topicEpoch.get(), producer.getTopicEpoch().get())));
+                    } else {
+                        // There are currently no existing producers
+                        hasExclusiveProducer = true;
+                        exclusiveProducerName = producer.getProducerName();
+
+                        CompletableFuture<Long> future;
+                        if (producer.getTopicEpoch().isPresent()) {
+                            future = setTopicEpoch(producer.getTopicEpoch().get());
+                        } else {
+                            future = incrementTopicEpoch(topicEpoch);
+                        }
+                        future.exceptionally(ex -> {
+                            hasExclusiveProducer = false;
+                            exclusiveProducerName = null;
+                            return null;
+                        });
+
+                        return future.thenApply(epoch -> {
+                            topicEpoch = Optional.of(epoch);
+                            return topicEpoch;
+                        });
+                    }
             case WaitForExclusive: {
                 if (hasExclusiveProducer || !producers.isEmpty()) {
                     CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 4d2b16a6a0e..604abd8d709 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -19,10 +19,12 @@
 package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.util.HashedWheelTimer;
@@ -73,8 +75,10 @@ public class ExclusiveProducerTest extends BrokerTestBase {
                 // ProducerAccessMode, partitioned
                 { ProducerAccessMode.Exclusive, Boolean.TRUE},
                 { ProducerAccessMode.Exclusive, Boolean.FALSE },
+                { ProducerAccessMode.ExclusiveWithFencing, Boolean.TRUE},
+                { ProducerAccessMode.ExclusiveWithFencing, Boolean.FALSE },
                 { ProducerAccessMode.WaitForExclusive, Boolean.TRUE },
-                { ProducerAccessMode.WaitForExclusive, Boolean.FALSE },
+                { ProducerAccessMode.WaitForExclusive, Boolean.FALSE }
         };
     }
 
@@ -88,12 +92,14 @@ public class ExclusiveProducerTest extends BrokerTestBase {
 
         Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
                 .topic(topic)
+                .producerName("p1")
                 .accessMode(ProducerAccessMode.Exclusive)
                 .create();
 
         try {
             pulsarClient.newProducer(Schema.STRING)
                     .topic(topic)
+                    .producerName("p-fail-1")
                     .accessMode(ProducerAccessMode.Exclusive)
                     .create();
             fail("Should have failed");
@@ -104,6 +110,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
         try {
             pulsarClient.newProducer(Schema.STRING)
                     .topic(topic)
+                    .producerName("p-fail-2")
                     .accessMode(ProducerAccessMode.Shared)
                     .create();
             fail("Should have failed");
@@ -116,9 +123,100 @@ public class ExclusiveProducerTest extends BrokerTestBase {
         // Now producer should be allowed to get in
         Producer<String> p2 = pulsarClient.newProducer(Schema.STRING)
                 .topic(topic)
+                .producerName("p2")
                 .accessMode(ProducerAccessMode.Exclusive)
                 .create();
-        p2.close();
+
+        Producer<String> p3 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .producerName("p3")
+                .accessMode(ProducerAccessMode.ExclusiveWithFencing)
+                .create();
+
+        try {
+            p2.send("test");
+            fail("Should have failed");
+        } catch (ProducerFencedException expected) {
+        }
+
+        // this should work
+        p3.send("test");
+        p3.close();
+
+        // test now WaitForExclusive vs ExclusiveWithFencing
+
+        // use two different Clients, because sometimes fencing a Producer triggers connection close
+        // making the test unreliable.
+
+        @Cleanup
+        PulsarClient pulsarClient2 = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .operationTimeout(2, TimeUnit.SECONDS)
+                .build();
+
+        Producer<String> p4 = pulsarClient2.newProducer(Schema.STRING)
+                .topic(topic)
+                .producerName("p4")
+                .accessMode(ProducerAccessMode.Exclusive)
+                .create();
+
+        p4.send("test");
+
+        // p5 will be waiting for the lock to be released
+        CompletableFuture<Producer<String>> p5 = pulsarClient2.newProducer(Schema.STRING)
+                .topic(topic)
+                .producerName("p5")
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .createAsync();
+
+        // wait for all the Producers to be enqueued in order to prevent races
+        Thread.sleep(2000);
+
+        // p6 fences out all the current producers, even p5
+        Producer<String> p6 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .producerName("p6")
+                .accessMode(ProducerAccessMode.ExclusiveWithFencing)
+                .create();
+
+        p6.send("test");
+
+        // p7 is enqueued after p6
+        CompletableFuture<Producer<String>> p7 = pulsarClient2.newProducer(Schema.STRING)
+                .topic(topic)
+                .producerName("p7")
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .createAsync();
+
+        // this should work, p6 is the owner
+        p6.send("test");
+
+        try {
+            p4.send("test");
+            fail("Should have failed");
+        } catch (ProducerFencedException expected) {
+        }
+
+        // this should work, p6 is the owner
+        p6.send("test");
+
+        // p5 fails
+        try {
+            p5.get();
+            fail("Should have failed");
+        } catch (ExecutionException expected) {
+            assertTrue(expected.getCause() instanceof ProducerFencedException,
+                    "unexpected exception " + expected.getCause());
+        }
+
+        // this should work, p6 is the owner
+        p6.send("test");
+
+        p6.close();
+
+        // p7 finally acquires the lock
+        p7.get().send("test");
+        p7.get().close();
     }
 
     @Test(dataProvider = "topics")
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
index 85199e7cf4c..33492802d16 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
@@ -33,6 +33,12 @@ public enum ProducerAccessMode {
      */
     Exclusive,
 
+    /**
+     * Acquire exclusive access for the producer. Any existing producer will be removed and
+     * invalidated immediately.
+     */
+    ExclusiveWithFencing,
+
     /**
      * Producer creation is pending until it can acquire exclusive access.
      */
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index cab4d497307..d1778685f84 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -133,6 +133,8 @@ public interface ProducerBuilder<T> extends Cloneable {
      * <li>{@link ProducerAccessMode#Shared}: By default multiple producers can publish on a topic
      * <li>{@link ProducerAccessMode#Exclusive}: Require exclusive access for producer. Fail immediately if there's
      * already a producer connected.
+     * <li>{@link ProducerAccessMode#ExclusiveWithFencing}: Require exclusive access for the producer.
+     * Any existing producer will be removed and invalidated immediately.
      * <li>{@link ProducerAccessMode#WaitForExclusive}: Producer creation is pending until it can acquire exclusive
      * access
      * </ul>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d1cbe3ad961..2d8e043058d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1853,6 +1853,8 @@ public class Commands {
             return org.apache.pulsar.common.api.proto.ProducerAccessMode.Shared;
         case WaitForExclusive:
             return org.apache.pulsar.common.api.proto.ProducerAccessMode.WaitForExclusive;
+        case ExclusiveWithFencing:
+            return org.apache.pulsar.common.api.proto.ProducerAccessMode.ExclusiveWithFencing;
         default:
             throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
         }
@@ -1867,6 +1869,8 @@ public class Commands {
             return ProducerAccessMode.Shared;
         case WaitForExclusive:
             return ProducerAccessMode.WaitForExclusive;
+        case ExclusiveWithFencing:
+            return ProducerAccessMode.ExclusiveWithFencing;
         default:
             throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
         }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index a5d97e51acf..a2bfd76c92c 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -99,6 +99,7 @@ enum ProducerAccessMode {
     Shared = 0; // By default multiple producers can publish on a topic
     Exclusive = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected.
     WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access
+    ExclusiveWithFencing = 3; // Require exclusive access for producer. Fence out old producer.
 }
 
 message MessageMetadata {
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index f7df3250762..b3520d70e8b 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -76,6 +76,7 @@ Access mode | Description
 :-----------|------------
 `Shared`           | Multiple producers can publish on a topic. <br /><br />This is the **default** setting.
 `Exclusive`        | Only one producer can publish on a topic. <br /><br />If there is already a producer connected, other producers trying to publish on this topic get errors immediately.<br /><br />The “old” producer is evicted and a “new” producer is selected to be the next exclusive producer if the “old” producer experiences a network partition with the broker.
+`ExclusiveWithFencing`|Only one producer can publish on a topic. <br /><br />If there is already a producer connected, it will be removed and invalidated immediately.
 `WaitForExclusive` | If there is already a producer connected, the producer creation is pending (rather than timing out) until the producer gets the `Exclusive` access.<br /><br />The producer that succeeds in becoming the exclusive one is treated as the leader. Consequently, if you want to implement a leader election scheme for your application, you can use this access mode. Note that the leader pattern scheme mentioned refers to using Pulsar as a Write-Ahead Log (WAL) which means the l [...]
 
 :::note