You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Eric Wittmann (Jira)" <ji...@apache.org> on 2021/07/15 16:04:01 UTC

[jira] [Created] (CAMEL-16807) camel-kafka - problem using two kafka connections in the same application

Eric Wittmann created CAMEL-16807:
-------------------------------------

             Summary: camel-kafka - problem using two kafka connections in the same application
                 Key: CAMEL-16807
                 URL: https://issues.apache.org/jira/browse/CAMEL-16807
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 3.8.0
            Reporter: Eric Wittmann


When using camel-kafka there will typically be many routes that produce to multiple Kafka topics with different schemas associated.

When using the camel-kafka client with apicurio, explicit schema artifacts set on Kafka producers override each other with the same settings despite them having different schema artifacts set through the camel additionalProperties settings.

This results in kafka client serialization exceptions stating a matching protobuf message is not found [1].

The expected behavior is that the apicurio properties should be applied to each Kafka producer independently.

A full example is available here: [https://github.com/shuawest/apicurio-sandbox].

 
{code:java}
String kdestA = KafkaUriBuilder.create(ProducerService.TOPIC_A)
            .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
            .appendProperty("clientId", "producerA")
            .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
            .appendProperty("maxRequestSize", "5242880")
            .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
            //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
            //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "samplea")
            .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
            //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
            .value();
    
        String kdestB = KafkaUriBuilder.create(ProducerService.TOPIC_B)
            .appendProperty("brokers", "{{aregsandbox.kafka.brokers}}")
            .appendProperty("clientId", "producerB")
            .appendProperty("valueSerializer", MyProtobufKafkaSerializer.class.getName())
            .appendProperty("maxRequestSize", "5242880")
            .appendAdditional(SerdeConfig.REGISTRY_URL, "{{registryurl}}")
            //.appendAdditional(SerdeConfig.SCHEMA_RESOLVER, MySchemaResolver.class.getName())
            //.appendAdditional(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, MyArtifactResolverStrategy.class.getName())
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "aregsandbox")
            .appendAdditional(SerdeConfig.EXPLICIT_ARTIFACT_ID, "sampleb")
            .appendAdditional(SerdeConfig.FIND_LATEST_ARTIFACT, "true")
            //.appendAdditional(SerdeConfig.CHECK_PERIOD_MS, "60000")
            .value();
    
        log.info("Kafka connection A: {}\n\n\n", kdestA);
        log.info("Kafka connection B: {}\n\n\n", kdestB);

        from("timer:producerTimerA?repeatCount=1000&delay=1s&period=1s")
            .bean(this, "genA")
            //.log("producer timer fired ${headers.genCount}:\n${body}");
            .to(kdestA);

        from("timer:producerTimerB?repeatCount=1000&delay=1s&period=1s")
            .bean(this, "genB")
            //.log("producer timer fired ${headers.genCount}:\n${body}");
            .to(kdestB);
{code}

The root cause seems to be that the Kafka configuration is cloned for each new connector, but the clone does not make a copy of the additionalProperties field.  The result is that all Kafka configuration instances share the same Map of additionalProperties.

Tested in 3.8.0 but it looks like this problem exists in the latest current version as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)