You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@karaf.apache.org by "michael elbaz (Jira)" <ji...@apache.org> on 2021/06/22 15:20:00 UTC
[jira] [Updated] (KARAF-7184) Unable to consume from Kafka topic
using Karaf 4.3.1 with Camel 3.7.4
[ https://issues.apache.org/jira/browse/KARAF-7184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
michael elbaz updated KARAF-7184:
---------------------------------
Description:
Hello i have a very basic camel route (trying to consume and log Kafka topic message):
{code:java}
@Override
public void configure() throws Exception {
super.configure();
from(kafka())
.routeId(INPUT_BROKER_ROUTE_ID)
.log("KAFKA BODY ::: ${body}");
}
private static String kafka() {
return new StringBuilder("kafka:")
.append("{{kafka.topic}}")
.append("?brokers=")
.append("{{kafka.brokers}}")
.append("&groupId=")
.append("{{kafka.group.id}}")
.append("&clientId=")
.append("myClientId")
.append("&autoOffsetReset=")
.append("earliest")
.append("&saslMechanism=")
.append("PLAIN")
.append("&securityProtocol=")
.append(SASL_PLAINTEXT)
.append("&saslJaasConfig=")
.append(saslJaasConfig())
.toString();
}
private static String saslJaasConfig() {
return new StringBuilder(format("%s", PlainLoginModule.class.getCanonicalName()))
.append(' ').append("required").append(' ')
.append("username=")
.append('"').append("{{kafka.username}}").append('"')
.append(' ')
.append("password=")
.append('"').append("{{kafka.password}}").append('"')
.append(';')
.toString();
}
{code}
If i run *unit test* on this route *i see topics message*, but when i deploy this bundle i don't get any message from my topic and there is no error Karaf just show me this logs:
{code:java}
11:37:27.064 INFO [Blueprint Event Dispatcher: 1] Successfully logged in.
11:37:27.067 WARN [Blueprint Event Dispatcher: 1] The configuration 'specific.avro.reader' was supplied but isn't a known config.
11:37:27.068 INFO [Blueprint Event Dispatcher: 1] Kafka version: 2.6.0
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka commitId: 62abe01bee039651
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka startTimeMs: 1624354647068
11:37:27.070 INFO [Blueprint Event Dispatcher: 1] Route: input-broker-route started and consuming from: kafka://test
11:37:27.070 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] Subscribing test-Thread 0 to topic test
11:37:27.071 INFO [Blueprint Event Dispatcher: 1] Total 1 routes, of which 1 are started
11:37:27.073 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] [Consumer clientId=myClientId, groupId=myGroupId] Subscribed to topic(s): test
11:37:27.073 INFO [Blueprint Event Dispatcher: 1] Apache Camel 3.7.4 (integ-norauto-v1-input) started in 38ms
{code}
I do some tests and i see that even if i provide wrong username or password nothing happen (only when running on Karaf) *it's look like it's not really connected to Kafka*
was:
Hello i have a very simple camel route (trying to consume and log Kafka topic message):
{code:java}
@Override
public void configure() throws Exception {
super.configure();
from(kafka())
.routeId(INPUT_BROKER_ROUTE_ID)
.log("KAFKA BODY ::: ${body}");
}
private static String kafka() {
return new StringBuilder("kafka:")
.append("{{kafka.topic}}")
.append("?brokers=")
.append("{{kafka.brokers}}")
.append("&groupId=")
.append("{{kafka.group.id}}")
.append("&clientId=")
.append("myClientId")
.append("&autoOffsetReset=")
.append("earliest")
.append("&saslMechanism=")
.append("PLAIN")
.append("&securityProtocol=")
.append(SASL_PLAINTEXT)
.append("&saslJaasConfig=")
.append(saslJaasConfig())
.toString();
}
private static String saslJaasConfig() {
return new StringBuilder(format("%s", PlainLoginModule.class.getCanonicalName()))
.append(' ').append("required").append(' ')
.append("username=")
.append('"').append("{{kafka.username}}").append('"')
.append(' ')
.append("password=")
.append('"').append("{{kafka.password}}").append('"')
.append(';')
.toString();
}
{code}
If i run *unit test* on this route *i see topics message*, but when i deploy this bundle i don't get any message from my topic and there is no error Karaf just show me this logs:
{code:java}
11:37:27.064 INFO [Blueprint Event Dispatcher: 1] Successfully logged in.
11:37:27.067 WARN [Blueprint Event Dispatcher: 1] The configuration 'specific.avro.reader' was supplied but isn't a known config.
11:37:27.068 INFO [Blueprint Event Dispatcher: 1] Kafka version: 2.6.0
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka commitId: 62abe01bee039651
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka startTimeMs: 1624354647068
11:37:27.070 INFO [Blueprint Event Dispatcher: 1] Route: input-broker-route started and consuming from: kafka://test
11:37:27.070 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] Subscribing test-Thread 0 to topic test
11:37:27.071 INFO [Blueprint Event Dispatcher: 1] Total 1 routes, of which 1 are started
11:37:27.073 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] [Consumer clientId=myClientId, groupId=myGroupId] Subscribed to topic(s): test
11:37:27.073 INFO [Blueprint Event Dispatcher: 1] Apache Camel 3.7.4 (integ-norauto-v1-input) started in 38ms
{code}
I do some tests and i see that even if i provide wrong username or password nothing happen (only when running on Karaf) *it's look like it's not really connected to Kafka*
> Unable to consume from Kafka topic using Karaf 4.3.1 with Camel 3.7.4
> ---------------------------------------------------------------------
>
> Key: KARAF-7184
> URL: https://issues.apache.org/jira/browse/KARAF-7184
> Project: Karaf
> Issue Type: Bug
> Components: karaf
> Affects Versions: 4.3.2
> Environment: *texte fort*
> Reporter: michael elbaz
> Priority: Major
>
> Hello i have a very basic camel route (trying to consume and log Kafka topic message):
> {code:java}
> @Override
> public void configure() throws Exception {
> super.configure();
> from(kafka())
> .routeId(INPUT_BROKER_ROUTE_ID)
> .log("KAFKA BODY ::: ${body}");
> }
> private static String kafka() {
> return new StringBuilder("kafka:")
> .append("{{kafka.topic}}")
> .append("?brokers=")
> .append("{{kafka.brokers}}")
> .append("&groupId=")
> .append("{{kafka.group.id}}")
> .append("&clientId=")
> .append("myClientId")
> .append("&autoOffsetReset=")
> .append("earliest")
> .append("&saslMechanism=")
> .append("PLAIN")
> .append("&securityProtocol=")
> .append(SASL_PLAINTEXT)
> .append("&saslJaasConfig=")
> .append(saslJaasConfig())
> .toString();
> }
> private static String saslJaasConfig() {
> return new StringBuilder(format("%s", PlainLoginModule.class.getCanonicalName()))
> .append(' ').append("required").append(' ')
> .append("username=")
> .append('"').append("{{kafka.username}}").append('"')
> .append(' ')
> .append("password=")
> .append('"').append("{{kafka.password}}").append('"')
> .append(';')
> .toString();
> }
> {code}
> If i run *unit test* on this route *i see topics message*, but when i deploy this bundle i don't get any message from my topic and there is no error Karaf just show me this logs:
> {code:java}
> 11:37:27.064 INFO [Blueprint Event Dispatcher: 1] Successfully logged in.
> 11:37:27.067 WARN [Blueprint Event Dispatcher: 1] The configuration 'specific.avro.reader' was supplied but isn't a known config.
> 11:37:27.068 INFO [Blueprint Event Dispatcher: 1] Kafka version: 2.6.0
> 11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka commitId: 62abe01bee039651
> 11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka startTimeMs: 1624354647068
> 11:37:27.070 INFO [Blueprint Event Dispatcher: 1] Route: input-broker-route started and consuming from: kafka://test
> 11:37:27.070 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] Subscribing test-Thread 0 to topic test
> 11:37:27.071 INFO [Blueprint Event Dispatcher: 1] Total 1 routes, of which 1 are started
> 11:37:27.073 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] [Consumer clientId=myClientId, groupId=myGroupId] Subscribed to topic(s): test
> 11:37:27.073 INFO [Blueprint Event Dispatcher: 1] Apache Camel 3.7.4 (integ-norauto-v1-input) started in 38ms
> {code}
> I do some tests and i see that even if i provide wrong username or password nothing happen (only when running on Karaf) *it's look like it's not really connected to Kafka*
--
This message was sent by Atlassian Jira
(v8.3.4#803005)