You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/25 08:20:11 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #8685: PIP 68: Exclusive Producer

eolivelli commented on a change in pull request #8685:
URL: https://github.com/apache/pulsar/pull/8685#discussion_r530175511



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();
+        try {
+            switch (producer.getAccessMode()) {
+            case Shared:
+                if (hasExclusiveProducer) {
+                   return FutureUtil.failedFuture(new ProducerBusyException("Topic has an existing exclusive producer"));

Review comment:
       is it possibile to log and/or to report in the exception who is the current exclusive producer ? 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -460,33 +460,37 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
     }
 
     @Override
-    public void addProducer(Producer producer) throws BrokerServiceException {
-        checkArgument(producer.getTopic() == this);
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        return super.addProducer(producer).thenApply(epoch -> {
+            messageDeduplication.producerAdded(producer.getProducerName());
 
-        lock.readLock().lock();
-        try {
-            brokerService.checkTopicNsOwnership(getName());
+            // Start replication producers if not already
+            startReplProducers();
+            return epoch;
+        });
+    }
 
-            checkTopicFenced();
+    private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";

Review comment:
       nit: please move constants on top of the classs

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -317,6 +332,94 @@ public String getReplicatorPrefix() {
                 .checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
     }
 
+    @Override
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+        checkArgument(producer.getTopic() == this);
+
+        CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+
+        incrementTopicEpochIfNeeded(producer)
+                .thenAccept(epoch -> {
+                    lock.readLock().lock();
+                    try {
+                        brokerService.checkTopicNsOwnership(getName());
+                        checkTopicFenced();
+                        if (isTerminated()) {
+                            log.warn("[{}] Attempting to add producer to a terminated topic", topic);
+                            throw new TopicTerminatedException("Topic was already terminated");
+                        }
+                        internalAddProducer(producer);
+
+                        USAGE_COUNT_UPDATER.incrementAndGet(this);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
+                                    USAGE_COUNT_UPDATER.get(this));
+                        }
+
+                        future.complete(epoch);
+                    } catch (Throwable e) {
+                        future.completeExceptionally(e);
+                    } finally {
+                        lock.readLock().unlock();
+                    }
+                }).exceptionally(ex -> {
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+        return future;
+    }
+
+    protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
+        lock.writeLock().lock();

Review comment:
       +1 to @rdhabalia suggestion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org