You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/26 16:08:22 UTC

[pulsar] branch master updated: [pulsar-broker]Fix: client-producer can't connect due to failed producer-future on cnx (#4138)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fbdf038  [pulsar-broker]Fix: client-producer can't connect due to failed producer-future on cnx (#4138)
fbdf038 is described below

commit fbdf0383492277fbb0589a5485e2171d1f3c3568
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Apr 26 09:08:17 2019 -0700

    [pulsar-broker]Fix: client-producer can't connect due to failed producer-future on cnx (#4138)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java    | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 83861b1..aebcb30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,10 +100,8 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.sasl.SaslConstants;
 import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfoUtil;
-import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -851,8 +849,13 @@ public class ServerCnx extends PulsarHandler {
                                 // until the previous producer creation
                                 // request
                                 // either complete or fails.
-                                ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
-                                        : getErrorCode(existingProducerFuture);
+                                ServerError error = null;
+                                if(!existingProducerFuture.isDone()) {
+                                    error = ServerError.ServiceNotReady;
+                                }else {
+                                    error = getErrorCode(existingProducerFuture);
+                                    producers.remove(producerId, producerFuture);
+                                }
                                 log.warn("[{}][{}] Producer with id {} is already present on the connection", remoteAddress,
                                         producerId, topicName);
                                 ctx.writeAndFlush(Commands.newError(requestId, error,
@@ -899,6 +902,8 @@ public class ServerCnx extends PulsarHandler {
                                 schemaVersionFuture = topic.addSchema(schema);
                             } else {
                                 schemaVersionFuture = topic.hasSchema().thenCompose((hasSchema) -> {
+                                        log.info("[{}]-{} {} configured with schema {}", remoteAddress, producerId,
+                                                topicName, hasSchema);
                                         CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
                                         if (hasSchema && schemaValidationEnforced) {
                                             result.completeExceptionally(new IncompatibleSchemaException(