You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@karaf.apache.org by "michael elbaz (Jira)" <ji...@apache.org> on 2021/06/22 15:20:00 UTC

[jira] [Updated] (KARAF-7184) Unable to consume from Kafka topic using Karaf 4.3.1 with Camel 3.7.4

     [ https://issues.apache.org/jira/browse/KARAF-7184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

michael elbaz updated KARAF-7184:
---------------------------------
    Description: 
Hello i have a very basic camel route (trying to consume and log Kafka topic message):


{code:java}
@Override
public void configure() throws Exception {
    super.configure();

    from(kafka())
            .routeId(INPUT_BROKER_ROUTE_ID)

            .log("KAFKA BODY ::: ${body}");
}

private static String kafka() {
    return new StringBuilder("kafka:")
            .append("{{kafka.topic}}")
            .append("?brokers=")
            .append("{{kafka.brokers}}")
            .append("&groupId=")
            .append("{{kafka.group.id}}")
            .append("&clientId=")
            .append("myClientId")
            .append("&autoOffsetReset=")
            .append("earliest")
            .append("&saslMechanism=")
            .append("PLAIN")
            .append("&securityProtocol=")
            .append(SASL_PLAINTEXT)
            .append("&saslJaasConfig=")
            .append(saslJaasConfig())
            .toString();
}

private static String saslJaasConfig() {
    return new StringBuilder(format("%s", PlainLoginModule.class.getCanonicalName()))
            .append(' ').append("required").append(' ')
            .append("username=")
            .append('"').append("{{kafka.username}}").append('"')
            .append(' ')
            .append("password=")
            .append('"').append("{{kafka.password}}").append('"')
            .append(';')
            .toString();
}
{code}

If i run *unit test* on this route *i see topics message*, but when i deploy this bundle i don't get any message from my topic and there is no error Karaf just show me this logs:


{code:java}
11:37:27.064 INFO [Blueprint Event Dispatcher: 1] Successfully logged in.
11:37:27.067 WARN [Blueprint Event Dispatcher: 1] The configuration 'specific.avro.reader' was supplied but isn't a known config.
11:37:27.068 INFO [Blueprint Event Dispatcher: 1] Kafka version: 2.6.0
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka commitId: 62abe01bee039651
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka startTimeMs: 1624354647068
11:37:27.070 INFO [Blueprint Event Dispatcher: 1] Route: input-broker-route started and consuming from: kafka://test
11:37:27.070 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] Subscribing test-Thread 0 to topic test
11:37:27.071 INFO [Blueprint Event Dispatcher: 1] Total 1 routes, of which 1 are started
11:37:27.073 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] [Consumer clientId=myClientId, groupId=myGroupId] Subscribed to topic(s): test
11:37:27.073 INFO [Blueprint Event Dispatcher: 1] Apache Camel 3.7.4 (integ-norauto-v1-input) started in 38ms
{code}


I do some tests and i see that even if i provide wrong username or password nothing happen (only when running on Karaf) *it's look like it's not really connected to Kafka*


  was:
Hello i have a very simple camel route (trying to consume and log Kafka topic message):


{code:java}
@Override
public void configure() throws Exception {
    super.configure();

    from(kafka())
            .routeId(INPUT_BROKER_ROUTE_ID)

            .log("KAFKA BODY ::: ${body}");
}

private static String kafka() {
    return new StringBuilder("kafka:")
            .append("{{kafka.topic}}")
            .append("?brokers=")
            .append("{{kafka.brokers}}")
            .append("&groupId=")
            .append("{{kafka.group.id}}")
            .append("&clientId=")
            .append("myClientId")
            .append("&autoOffsetReset=")
            .append("earliest")
            .append("&saslMechanism=")
            .append("PLAIN")
            .append("&securityProtocol=")
            .append(SASL_PLAINTEXT)
            .append("&saslJaasConfig=")
            .append(saslJaasConfig())
            .toString();
}

private static String saslJaasConfig() {
    return new StringBuilder(format("%s", PlainLoginModule.class.getCanonicalName()))
            .append(' ').append("required").append(' ')
            .append("username=")
            .append('"').append("{{kafka.username}}").append('"')
            .append(' ')
            .append("password=")
            .append('"').append("{{kafka.password}}").append('"')
            .append(';')
            .toString();
}
{code}

If i run *unit test* on this route *i see topics message*, but when i deploy this bundle i don't get any message from my topic and there is no error Karaf just show me this logs:


{code:java}
11:37:27.064 INFO [Blueprint Event Dispatcher: 1] Successfully logged in.
11:37:27.067 WARN [Blueprint Event Dispatcher: 1] The configuration 'specific.avro.reader' was supplied but isn't a known config.
11:37:27.068 INFO [Blueprint Event Dispatcher: 1] Kafka version: 2.6.0
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka commitId: 62abe01bee039651
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka startTimeMs: 1624354647068
11:37:27.070 INFO [Blueprint Event Dispatcher: 1] Route: input-broker-route started and consuming from: kafka://test
11:37:27.070 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] Subscribing test-Thread 0 to topic test
11:37:27.071 INFO [Blueprint Event Dispatcher: 1] Total 1 routes, of which 1 are started
11:37:27.073 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] [Consumer clientId=myClientId, groupId=myGroupId] Subscribed to topic(s): test
11:37:27.073 INFO [Blueprint Event Dispatcher: 1] Apache Camel 3.7.4 (integ-norauto-v1-input) started in 38ms
{code}


I do some tests and i see that even if i provide wrong username or password nothing happen (only when running on Karaf) *it's look like it's not really connected to Kafka*



> Unable to consume from Kafka topic using Karaf 4.3.1 with Camel 3.7.4
> ---------------------------------------------------------------------
>
>                 Key: KARAF-7184
>                 URL: https://issues.apache.org/jira/browse/KARAF-7184
>             Project: Karaf
>          Issue Type: Bug
>          Components: karaf
>    Affects Versions: 4.3.2
>         Environment: *texte fort*
>            Reporter: michael elbaz
>            Priority: Major
>
> Hello i have a very basic camel route (trying to consume and log Kafka topic message):
> {code:java}
> @Override
> public void configure() throws Exception {
>     super.configure();
>     from(kafka())
>             .routeId(INPUT_BROKER_ROUTE_ID)
>             .log("KAFKA BODY ::: ${body}");
> }
> private static String kafka() {
>     return new StringBuilder("kafka:")
>             .append("{{kafka.topic}}")
>             .append("?brokers=")
>             .append("{{kafka.brokers}}")
>             .append("&groupId=")
>             .append("{{kafka.group.id}}")
>             .append("&clientId=")
>             .append("myClientId")
>             .append("&autoOffsetReset=")
>             .append("earliest")
>             .append("&saslMechanism=")
>             .append("PLAIN")
>             .append("&securityProtocol=")
>             .append(SASL_PLAINTEXT)
>             .append("&saslJaasConfig=")
>             .append(saslJaasConfig())
>             .toString();
> }
> private static String saslJaasConfig() {
>     return new StringBuilder(format("%s", PlainLoginModule.class.getCanonicalName()))
>             .append(' ').append("required").append(' ')
>             .append("username=")
>             .append('"').append("{{kafka.username}}").append('"')
>             .append(' ')
>             .append("password=")
>             .append('"').append("{{kafka.password}}").append('"')
>             .append(';')
>             .toString();
> }
> {code}
> If i run *unit test* on this route *i see topics message*, but when i deploy this bundle i don't get any message from my topic and there is no error Karaf just show me this logs:
> {code:java}
> 11:37:27.064 INFO [Blueprint Event Dispatcher: 1] Successfully logged in.
> 11:37:27.067 WARN [Blueprint Event Dispatcher: 1] The configuration 'specific.avro.reader' was supplied but isn't a known config.
> 11:37:27.068 INFO [Blueprint Event Dispatcher: 1] Kafka version: 2.6.0
> 11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka commitId: 62abe01bee039651
> 11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka startTimeMs: 1624354647068
> 11:37:27.070 INFO [Blueprint Event Dispatcher: 1] Route: input-broker-route started and consuming from: kafka://test
> 11:37:27.070 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] Subscribing test-Thread 0 to topic test
> 11:37:27.071 INFO [Blueprint Event Dispatcher: 1] Total 1 routes, of which 1 are started
> 11:37:27.073 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] [Consumer clientId=myClientId, groupId=myGroupId] Subscribed to topic(s): test
> 11:37:27.073 INFO [Blueprint Event Dispatcher: 1] Apache Camel 3.7.4 (integ-norauto-v1-input) started in 38ms
> {code}
> I do some tests and i see that even if i provide wrong username or password nothing happen (only when running on Karaf) *it's look like it's not really connected to Kafka*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)