You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/25 16:55:29 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #8685: PIP 68: Exclusive Producer

merlimat commented on a change in pull request #8685:
URL: https://github.com/apache/pulsar/pull/8685#discussion_r530518677



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();
+        try {
+            switch (producer.getAccessMode()) {
+            case Shared:
+                if (hasExclusiveProducer) {
+                   return FutureUtil.failedFuture(new ProducerBusyException("Topic has an existing exclusive producer"));
+                } else {
+                    // Normal producer getting added, we don't need a new epoch
+                    return CompletableFuture.completedFuture(topicEpoch);
+                }
+
+            case Exclusive:
+                 if (hasExclusiveProducer || !producers.isEmpty()) {
+                    return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing producers"));
+                 } else if (producer.getTopicEpoch().isPresent() && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)){
+                     // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs to
+                     // be fenced, because a new producer had been present in between.
+                     return FutureUtil.failedFuture(new ProducerFencedException("Topic epoch has already moved"));

Review comment:
       Sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org