You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/26 16:08:22 UTC
[pulsar] branch master updated: [pulsar-broker]Fix: client-producer
can't connect due to failed producer-future on cnx (#4138)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fbdf038 [pulsar-broker]Fix: client-producer can't connect due to failed producer-future on cnx (#4138)
fbdf038 is described below
commit fbdf0383492277fbb0589a5485e2171d1f3c3568
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Apr 26 09:08:17 2019 -0700
[pulsar-broker]Fix: client-producer can't connect due to failed producer-future on cnx (#4138)
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 83861b1..aebcb30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,10 +100,8 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.sasl.SaslConstants;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfoUtil;
-import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -851,8 +849,13 @@ public class ServerCnx extends PulsarHandler {
// until the previous producer creation
// request
// either complete or fails.
- ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady
- : getErrorCode(existingProducerFuture);
+ ServerError error = null;
+ if(!existingProducerFuture.isDone()) {
+ error = ServerError.ServiceNotReady;
+ }else {
+ error = getErrorCode(existingProducerFuture);
+ producers.remove(producerId, producerFuture);
+ }
log.warn("[{}][{}] Producer with id {} is already present on the connection", remoteAddress,
producerId, topicName);
ctx.writeAndFlush(Commands.newError(requestId, error,
@@ -899,6 +902,8 @@ public class ServerCnx extends PulsarHandler {
schemaVersionFuture = topic.addSchema(schema);
} else {
schemaVersionFuture = topic.hasSchema().thenCompose((hasSchema) -> {
+ log.info("[{}]-{} {} configured with schema {}", remoteAddress, producerId,
+ topicName, hasSchema);
CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
if (hasSchema && schemaValidationEnforced) {
result.completeExceptionally(new IncompatibleSchemaException(