You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/11 09:50:58 UTC

[pulsar] branch master updated: Close producer when the topic does not exists. (#6879)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eed217  Close producer when the topic does not exists. (#6879)
6eed217 is described below

commit 6eed21739e5c33e5064cd34cf0da0d93a991b1e5
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon May 11 17:50:39 2020 +0800

    Close producer when the topic does not exists. (#6879)
    
    Fixes #6838
    
    ### Motivation
    
    Close producer when the topic does not exists.
    
    ### Modifications
    
    1. Fix exception handle for the topic does not exist.
    2. Change state to Close when producer got TopicDoesNotExists exception so that the producer can close the cnx and will no longer add send timeout tasks to HashedWheelTimer.
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  1 -
 .../apache/pulsar/broker/service/ServerCnx.java    |  6 ++
 .../pulsar/client/impl/TopicDoesNotExistsTest.java | 70 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 15 ++++-
 4 files changed, 90 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b53e12e..b205d46 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -67,7 +67,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1ca4f25..1eefc37 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -37,6 +37,7 @@ import io.netty.handler.ssl.SslHandler;
 import java.net.SocketAddress;
 
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
@@ -1117,6 +1118,11 @@ public class ServerCnx extends PulsarHandler {
                             });
                         }).exceptionally(exception -> {
                             Throwable cause = exception.getCause();
+
+                            if (cause instanceof NoSuchElementException) {
+                                cause = new TopicNotFoundException("Topic Not Found.");
+                            }
+
                             if (!(cause instanceof ServiceUnitNotReadyException)) {
                                 // Do not print stack traces for expected exceptions
                                 log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
new file mode 100644
index 0000000..adaa831
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import io.netty.util.HashedWheelTimer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for not exists topic.
+ */
+public class TopicDoesNotExistsTest extends ProducerConsumerBase {
+
+    @Override
+    @BeforeClass
+    public void setup() throws Exception {
+        conf.setAllowAutoTopicCreation(false);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterClass
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
+    public void testCreateProducerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
+        pulsarClient.newProducer()
+                .topic("persistent://public/default/" + UUID.randomUUID().toString())
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .create();
+        Thread.sleep(2000);
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+        Assert.assertEquals(timer.pendingTimeouts(), 0);
+    }
+
+    @Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
+    public void testCreateConsumerOnNotExistsTopic() throws PulsarClientException {
+        pulsarClient.newConsumer()
+                .topic("persistent://public/default/" + UUID.randomUUID().toString())
+                .subscriptionName("test")
+                .subscribe();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index f7768a4..c249d8f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1177,7 +1177,20 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                         return null;
                     }
                     log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage());
-
+                    // Close the producer since topic does not exists.
+                    if (getState() == State.Failed
+                            && cause instanceof PulsarClientException.TopicDoesNotExistException) {
+                        closeAsync().whenComplete((v, ex) -> {
+                            if (ex != null) {
+                                log.error("Failed to close producer on TopicDoesNotExistException.", ex);
+                            }
+                            producerCreatedFuture.completeExceptionally(cause);
+                            if (getState() == State.Closing || getState() == State.Closed) {
+                                cnx.channel().close();
+                            }
+                        });
+                        return null;
+                    }
                     if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
                         synchronized (this) {
                             log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", topic,