You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/10/27 10:22:14 UTC
[GitHub] [camel-quarkus] tstuber opened a new issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry
tstuber opened a new issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960
Hi all
I am trying to consume Avro messages from Kafka by using camel-quarkus-kafka by including the schema-registry.
**My test setup:**
- local Kafka broker/zk
- local schema registry
- local Kafka event producer, writin new random avro messages (schema: fields with "id" and "message")
**Issue**
When I am involving the schema registry parameters, I run into an infinit loop of consumer reconnections. The log component is also never hit. I am not sure If I misconfigured something or if there is an issue involved in that.
What I would like to achieve is that a consumer derives the schema automatically and would convert the Avro schema into an pojo for further processing.
from(kafka("avro-events")
.brokers("{{kafka.bootstrap.servers}}")
.schemaRegistryURL("{{schema.registry.url}}")
.specificAvroReader(true)
.valueDeserializer("io.confluent.kafka.serializers.KafkaAvroDeserializer")
.resolve(getContext()))
.log("Avro Message: ${body}")
```
2020-10-27 11:06:22,746 INFO [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 5919fa66-4bf2-47b2-b227-3d02769f3225
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
2020-10-27 11:06:22,747 INFO [io.con.kaf.ser.KafkaAvroDeserializerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) KafkaAvroDeserializerConfig values:
bearer.auth.token = [hidden]
proxy.port = -1
schema.reflection = false
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
specific.avro.reader = true
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
schema.registry.url = [localhost:8081]
basic.auth.user.info = [hidden]
proxy.host =
use.latest.version = false
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
2020-10-27 11:06:22,753 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka version: 2.5.0
2020-10-27 11:06:22,753 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka commitId: 66563e712b0b9f84
2020-10-27 11:06:22,754 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka startTimeMs: 1603793182753
2020-10-27 11:06:22,754 INFO [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Reconnecting avro-events-Thread 0 to topic avro-events after 5000 ms
2020-10-27 11:06:27,755 INFO [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Subscribing avro-events-Thread 0 to topic avro-events
2020-10-27 11:06:27,757 INFO [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Subscribed to topic(s): avro-events
2020-10-27 11:06:27,774 INFO [org.apa.kaf.cli.Metadata] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Cluster ID: CBXaaoMdSfu7ChosVF8hRQ
2020-10-27 11:06:27,776 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2020-10-27 11:06:27,779 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] (Re-)joining group
2020-10-27 11:06:27,792 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-10-27 11:06:27,793 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] (Re-)joining group
2020-10-27 11:06:27,799 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Finished assignment for group at generation 3: {consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2-e4c3ddd6-e9da-4b80-bbf5-1631a3f08c9f=Assignment(partitions=[avro-events-0])}
2020-10-27 11:06:27,802 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Successfully joined group with generation 3
2020-10-27 11:06:27,802 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Adding newly assigned partitions: avro-events-0
2020-10-27 11:06:27,808 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Setting offset for partition avro-events-0 to the committed offset FetchPosition{offset=914, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2020-10-27 11:06:27,816 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Revoke previously assigned partitions avro-events-0
2020-10-27 11:06:27,816 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Member consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2-e4c3ddd6-e9da-4b80-bbf5-1631a3f08c9f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed
2020-10-27 11:06:27,823 INFO [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 5919fa66-4bf2-47b2-b227-3d02769f3225
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
```
When I consume and log messages without involving schema registry, it works fine:
from(kafka("avro-events")
.brokers("{{kafka.bootstrap.servers}}")
.resolve(getContext()))
.log("Avro Message: ${body}");
Output:
```
2020-10-27 10:58:07,699 INFO [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �H786db2c3-c0ee-497e-ba61-dabca64f2729
2020-10-27 10:58:10,710 INFO [org.apa.cam.Tracing] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [route1 ] [log ] Exchange[Id: D25D23EC4A3C0CF-000000000000001D, BodyType: String, Body: �H38dadb0e-75b3-484c-b499-f61c654512a5]
2020-10-27 10:58:10,710 INFO [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �H38dadb0e-75b3-484c-b499-f61c654512a5
2020-10-27 10:58:13,699 INFO [org.apa.cam.Tracing] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [route1 ] [log ] Exchange[Id: D25D23EC4A3C0CF-000000000000001E, BodyType: String, Body: �Ha53e0dfa-e302-4b1e-a5ff-db33bcf6972e]
2020-10-27 10:58:13,699 INFO [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �Ha53e0dfa-e302-4b1e-a5ff-db33bcf6972e
2020-10-27 10:58:16,707 INFO [org.apa.cam.Tracing] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [route1 ] [log ] Exchange[Id: D25D23EC4A3C0CF-000000000000001F, BodyType: String, Body: �Hc116e361-7831-4e06-9d1b-759b8bb516f5]
2020-10-27 10:58:16,707 INFO [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �Hc116e361-7831-4e06-9d1b-759b8bb516f5
```
Thanks a lot for your support.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-quarkus] tstuber commented on issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry
Posted by GitBox <gi...@apache.org>.
tstuber commented on issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960#issuecomment-750064745
I am going to close this issue.
* Not having pojos generated on the base of an avro schema was my mistake. That is not an issue related to camel-quarkus
* The issue with the infinite re-connection loop (that I faced during deserialisation due to wrong config) is already addressed here: https://issues.apache.org/jira/browse/CAMEL-14980
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-quarkus] oscerd commented on issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry
Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960#issuecomment-750337645
Yes, I think we should add some docs about the specific case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-quarkus] tstuber closed issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry
Posted by GitBox <gi...@apache.org>.
tstuber closed issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-quarkus] ppalaga commented on issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry
Posted by GitBox <gi...@apache.org>.
ppalaga commented on issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960#issuecomment-750332505
Thanks for the report and the thorough investigation, @tstuber and sorry for zero help from our side. I wonder how could we improve the UX for the folks interested in this use case in the future? I do not think there is anything Camel Quarkus specific on this use case, so an example or a piece of docs should hang on the main Camel. WDYT @oscerd and other Kafka experts?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-quarkus] tstuber commented on issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry
Posted by GitBox <gi...@apache.org>.
tstuber commented on issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960#issuecomment-718059534
I found a "workaround":
* Manually add the avro scheme into the project (e.g. `resources/avro/event.avsc`)
* Configure the `avro-maven-plugin` to generate the sources
Once the generated sources are available, the code works as listed above.
I am not sure if this is working as expected, since the documentation is not very clear to me. Also, I did not find good examples for this case. I could contribute here, if someone could provide some inputs here.
Last but not least, I am not sure if this is actually a camel-quarkus issue. I did not test this behavior with another setup.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org