You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/25 08:03:46 UTC

[GitHub] [pulsar] rdhabalia commented on a change in pull request #8685: PIP 68: Exclusive Producer

rdhabalia commented on a change in pull request #8685:
URL: https://github.com/apache/pulsar/pull/8685#discussion_r530165563



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();
+        try {
+            switch (producer.getAccessMode()) {
+            case Shared:
+                if (hasExclusiveProducer) {
+                   return FutureUtil.failedFuture(new ProducerBusyException("Topic has an existing exclusive producer"));
+                } else {
+                    // Normal producer getting added, we don't need a new epoch
+                    return CompletableFuture.completedFuture(topicEpoch);
+                }
+
+            case Exclusive:
+                 if (hasExclusiveProducer || !producers.isEmpty()) {
+                    return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing producers"));
+                 } else if (producer.getTopicEpoch().isPresent() && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)){
+                     // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs to
+                     // be fenced, because a new producer had been present in between.
+                     return FutureUtil.failedFuture(new ProducerFencedException("Topic epoch has already moved"));

Review comment:
       can we add debug log here with epoch.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was already terminated");
+                        }
+                        internalAddProducer(producer);

Review comment:
       is it possible to add producer-type if `topic-stats` if producer is exclusive for information and debugging.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();
+        try {
+            switch (producer.getAccessMode()) {
+            case Shared:
+                if (hasExclusiveProducer) {
+                   return FutureUtil.failedFuture(new ProducerBusyException("Topic has an existing exclusive producer"));
+                } else {
+                    // Normal producer getting added, we don't need a new epoch
+                    return CompletableFuture.completedFuture(topicEpoch);
+                }
+
+            case Exclusive:
+                 if (hasExclusiveProducer || !producers.isEmpty()) {
+                    return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing producers"));
+                 } else if (producer.getTopicEpoch().isPresent() && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)){
+                     // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs to
+                     // be fenced, because a new producer had been present in between.
+                     return FutureUtil.failedFuture(new ProducerFencedException("Topic epoch has already moved"));
+                } else {
+                    // There are currently no existing producers
+                    hasExclusiveProducer = true;
+
+                    CompletableFuture<Optional<Long>> future = incrementTopicEpoch(topicEpoch).thenApply(epoch -> {
+                        topicEpoch = Optional.of(epoch);
+                        return topicEpoch;
+                    });
+
+                    future.exceptionally(ex -> {
+                        hasExclusiveProducer = false;
+                        return null;
+                    });
+                    return future;
+                }
+
+           // case WaitForExclusive:

Review comment:
       I have one suggestion. I am sure you must have thought about it but can't we rename `WaitForExclusive` with `FailOver` as it will be consistent to subscription type name and it will need no explanation and easy to understand.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -451,7 +554,44 @@ private boolean isUserProvidedProducerName(Producer producer){
         return producer.isUserProvidedProducerName() && !producer.getProducerName().startsWith(replicatorPrefix);
     }
 
-    protected abstract void handleProducerRemoved(Producer producer);
+
+    @Override
+    public void removeProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        if (producers.remove(producer.getProducerName(), producer)) {
+            handleProducerRemoved(producer);

Review comment:
       shouldn't we need lock here else it may create a race condition and  producer with `WaitForExclusive ` may wait forever.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();

Review comment:
       can we avoid locking for normal producer usecase? 
   ```
   if (producers.isEmpty() && producer.getAccessMode() == Shared) {
   return CompletableFuture.completedFuture(topicEpoch);
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org