You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Andrea Cosentino (Jira)" <ji...@apache.org> on 2021/07/16 05:25:00 UTC
[jira] [Resolved] (CAMEL-16807) camel-kafka - problem using two
kafka connections in the same application
[ https://issues.apache.org/jira/browse/CAMEL-16807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrea Cosentino resolved CAMEL-16807.
--------------------------------------
Resolution: Fixed
> camel-kafka - problem using two kafka connections in the same application
> -------------------------------------------------------------------------
>
> Key: CAMEL-16807
> URL: https://issues.apache.org/jira/browse/CAMEL-16807
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.8.0
> Reporter: Eric Wittmann
> Assignee: Andrea Cosentino
> Priority: Major
> Fix For: 3.11.1, 3.12.0, 3.7.6
>
>
> When using camel-kafka there will typically be many routes that produce to multiple Kafka topics with different schemas associated.
> When using the camel-kafka client with apicurio, explicit schema artifacts set on Kafka producers override each other with the same settings despite them having different schema artifacts set through the camel additionalProperties settings.
> This results in kafka client serialization exceptions stating a matching protobuf message is not found [1].
> The expected behavior is that the apicurio properties should be applied to each Kafka producer independently.
> A full example is available here: [https://github.com/shuawest/apicurio-sandbox].
>
> {code:java}
> String kdestA = KafkaUriBuilder.create(ProducerService.TOPIC_A)
> .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
> .appendProperty("clientId", "producerA")
> .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
> .appendProperty("maxRequestSize", "5242880")
> .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
> //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
> //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
> .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
> .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "samplea")
> .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
> //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
> .value();
>
> String kdestB = KafkaUriBuilder.create(ProducerService.TOPIC_B)
> .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
> .appendProperty("clientId", "producerB")
> .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
> .appendProperty("maxRequestSize", "5242880")
> .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
> //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
> //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
> .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
> .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "sampleb")
> .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
> //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
> .value();
>
> log.info("Kafka connection A: {}\n\n\n", kdestA);
> log.info("Kafka connection B: {}\n\n\n", kdestB);
> from("timer:producerTimerA?repeatCount=1000&delay=1s&period=1s")
> .bean(this, "genA")
> //.log("producer timer fired ${headers.genCount}:\n${body}");
> .to(kdestA);
> from("timer:producerTimerB?repeatCount=1000&delay=1s&period=1s")
> .bean(this, "genB")
> //.log("producer timer fired ${headers.genCount}:\n${body}");
> .to(kdestB);
> {code}
> The root cause seems to be that the Kafka configuration is cloned for each new connector, but the clone does not make a copy of the additionalProperties field. The result is that all Kafka configuration instances share the same Map of additionalProperties.
> Tested in 3.8.0 but it looks like this problem exists in the latest current version as well.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)