You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Andrea Cosentino (Jira)" <ji...@apache.org> on 2021/07/16 05:25:00 UTC

[jira] [Resolved] (CAMEL-16807) camel-kafka - problem using two kafka connections in the same application

     [ https://issues.apache.org/jira/browse/CAMEL-16807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrea Cosentino resolved CAMEL-16807.
--------------------------------------
    Resolution: Fixed

> camel-kafka - problem using two kafka connections in the same application
> -------------------------------------------------------------------------
>
>                 Key: CAMEL-16807
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16807
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.8.0
>            Reporter: Eric Wittmann
>            Assignee: Andrea Cosentino
>            Priority: Major
>             Fix For: 3.11.1, 3.12.0, 3.7.6
>
>
> When using camel-kafka there will typically be many routes that produce to multiple Kafka topics with different schemas associated.
> When using the camel-kafka client with apicurio, explicit schema artifacts set on Kafka producers override each other with the same settings despite them having different schema artifacts set through the camel additionalProperties settings.
> This results in kafka client serialization exceptions stating a matching protobuf message is not found [1].
> The expected behavior is that the apicurio properties should be applied to each Kafka producer independently.
> A full example is available here: [https://github.com/shuawest/apicurio-sandbox].
>  
> {code:java}
> String kdestA = KafkaUriBuilder.create(ProducerService.TOPIC_A)
>             .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
>             .appendProperty("clientId", "producerA")
>             .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
>             .appendProperty("maxRequestSize", "5242880")
>             .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
>             //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
>             //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
>             .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
>             .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "samplea")
>             .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
>             //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
>             .value();
>     
>         String kdestB = KafkaUriBuilder.create(ProducerService.TOPIC_B)
>             .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
>             .appendProperty("clientId", "producerB")
>             .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
>             .appendProperty("maxRequestSize", "5242880")
>             .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
>             //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
>             //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
>             .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
>             .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "sampleb")
>             .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
>             //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
>             .value();
>     
>         log.info("Kafka connection A: {}\n\n\n", kdestA);
>         log.info("Kafka connection B: {}\n\n\n", kdestB);
>         from("timer:producerTimerA?repeatCount=1000&delay=1s&period=1s")
>             .bean(this, "genA")
>             //.log("producer timer fired ${headers.genCount}:\n${body}");
>             .to(kdestA);
>         from("timer:producerTimerB?repeatCount=1000&delay=1s&period=1s")
>             .bean(this, "genB")
>             //.log("producer timer fired ${headers.genCount}:\n${body}");
>             .to(kdestB);
> {code}
> The root cause seems to be that the Kafka configuration is cloned for each new connector, but the clone does not make a copy of the additionalProperties field.  The result is that all Kafka configuration instances share the same Map of additionalProperties.
> Tested in 3.8.0 but it looks like this problem exists in the latest current version as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)