You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/10/27 10:22:14 UTC

[GitHub] [camel-quarkus] tstuber opened a new issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry

tstuber opened a new issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960


   Hi all
   
   I am trying to consume Avro messages from Kafka by using camel-quarkus-kafka by including the schema-registry.
   
   **My test setup:**
   
   - local Kafka broker/zk
   - local schema registry
   - local Kafka event producer, writin new random avro messages (schema: fields with "id" and "message")
   
   **Issue**
   
   When I am involving the schema registry parameters, I run into an infinit loop of consumer reconnections. The log component is also never hit. I am not sure If I misconfigured something or if there is an issue involved in that. 
   What I would like to achieve is that a consumer derives the schema automatically and would convert the Avro schema into an pojo for further processing. 
   
           from(kafka("avro-events")
                   .brokers("{{kafka.bootstrap.servers}}")
                   .schemaRegistryURL("{{schema.registry.url}}")
                   .specificAvroReader(true)
                   .valueDeserializer("io.confluent.kafka.serializers.KafkaAvroDeserializer")
                   .resolve(getContext()))
   
                   .log("Avro Message: ${body}")
   
   ```
   2020-10-27 11:06:22,746 INFO  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) ConsumerConfig values: 
           allow.auto.create.topics = true
           auto.commit.interval.ms = 5000
           auto.offset.reset = latest
           bootstrap.servers = [localhost:9092]
           check.crcs = true
           client.dns.lookup = default
           client.id = 
           client.rack = 
           connections.max.idle.ms = 540000
           default.api.timeout.ms = 60000
           enable.auto.commit = true
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = 5919fa66-4bf2-47b2-b227-3d02769f3225
           group.instance.id = null
           heartbeat.interval.ms = 3000
           interceptor.classes = []
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 40000
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.mechanism = GSSAPI
           security.protocol = PLAINTEXT
           security.providers = null
           send.buffer.bytes = 131072
           session.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2]
           ssl.endpoint.identification.algorithm = https
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLSv1.2
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
   
   2020-10-27 11:06:22,747 INFO  [io.con.kaf.ser.KafkaAvroDeserializerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) KafkaAvroDeserializerConfig values: 
           bearer.auth.token = [hidden]
           proxy.port = -1
           schema.reflection = false
           auto.register.schemas = true
           max.schemas.per.subject = 1000
           basic.auth.credentials.source = URL
           specific.avro.reader = true
           value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
           schema.registry.url = [localhost:8081]
           basic.auth.user.info = [hidden]
           proxy.host = 
           use.latest.version = false
           schema.registry.basic.auth.user.info = [hidden]
           bearer.auth.credentials.source = STATIC_TOKEN
           key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
   
   2020-10-27 11:06:22,753 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka version: 2.5.0
   2020-10-27 11:06:22,753 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka commitId: 66563e712b0b9f84
   2020-10-27 11:06:22,754 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka startTimeMs: 1603793182753
   2020-10-27 11:06:22,754 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Reconnecting avro-events-Thread 0 to topic avro-events after 5000 ms
   2020-10-27 11:06:27,755 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Subscribing avro-events-Thread 0 to topic avro-events
   2020-10-27 11:06:27,757 INFO  [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Subscribed to topic(s): avro-events
   2020-10-27 11:06:27,774 INFO  [org.apa.kaf.cli.Metadata] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Cluster ID: CBXaaoMdSfu7ChosVF8hRQ
   2020-10-27 11:06:27,776 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
   2020-10-27 11:06:27,779 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] (Re-)joining group
   2020-10-27 11:06:27,792 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
   2020-10-27 11:06:27,793 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] (Re-)joining group
   2020-10-27 11:06:27,799 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Finished assignment for group at generation 3: {consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2-e4c3ddd6-e9da-4b80-bbf5-1631a3f08c9f=Assignment(partitions=[avro-events-0])}
   2020-10-27 11:06:27,802 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Successfully joined group with generation 3
   2020-10-27 11:06:27,802 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Adding newly assigned partitions: avro-events-0
   2020-10-27 11:06:27,808 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Setting offset for partition avro-events-0 to the committed offset FetchPosition{offset=914, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
   2020-10-27 11:06:27,816 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Revoke previously assigned partitions avro-events-0
   2020-10-27 11:06:27,816 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Member consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2-e4c3ddd6-e9da-4b80-bbf5-1631a3f08c9f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed
   2020-10-27 11:06:27,823 INFO  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) ConsumerConfig values: 
           allow.auto.create.topics = true
           auto.commit.interval.ms = 5000
           auto.offset.reset = latest
           bootstrap.servers = [localhost:9092]
           check.crcs = true
           client.dns.lookup = default
           client.id = 
           client.rack = 
           connections.max.idle.ms = 540000
           default.api.timeout.ms = 60000
           enable.auto.commit = true
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = 5919fa66-4bf2-47b2-b227-3d02769f3225
           group.instance.id = null
           heartbeat.interval.ms = 3000
           interceptor.classes = []
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 40000
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.mechanism = GSSAPI
           security.protocol = PLAINTEXT
           security.providers = null
           send.buffer.bytes = 131072
           session.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2]
           ssl.endpoint.identification.algorithm = https
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLSv1.2
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
   
   ```
   
   When I consume and log messages without involving schema registry, it works fine:
   
           from(kafka("avro-events")
                   .brokers("{{kafka.bootstrap.servers}}")
                   .resolve(getContext()))
   
                   .log("Avro Message: ${body}");
   
   Output: 
   ```
   2020-10-27 10:58:07,699 INFO  [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �H786db2c3-c0ee-497e-ba61-dabca64f2729
   2020-10-27 10:58:10,710 INFO  [org.apa.cam.Tracing] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events])      [route1      ] [log                              ] Exchange[Id: D25D23EC4A3C0CF-000000000000001D, BodyType: String, Body: �H38dadb0e-75b3-484c-b499-f61c654512a5]
   2020-10-27 10:58:10,710 INFO  [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �H38dadb0e-75b3-484c-b499-f61c654512a5
   2020-10-27 10:58:13,699 INFO  [org.apa.cam.Tracing] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events])      [route1      ] [log                              ] Exchange[Id: D25D23EC4A3C0CF-000000000000001E, BodyType: String, Body: �Ha53e0dfa-e302-4b1e-a5ff-db33bcf6972e]
   2020-10-27 10:58:13,699 INFO  [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �Ha53e0dfa-e302-4b1e-a5ff-db33bcf6972e
   2020-10-27 10:58:16,707 INFO  [org.apa.cam.Tracing] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events])      [route1      ] [log                              ] Exchange[Id: D25D23EC4A3C0CF-000000000000001F, BodyType: String, Body: �Hc116e361-7831-4e06-9d1b-759b8bb516f5]
   2020-10-27 10:58:16,707 INFO  [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �Hc116e361-7831-4e06-9d1b-759b8bb516f5
   ```
   
   Thanks a lot for your support.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-quarkus] tstuber commented on issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry

Posted by GitBox <gi...@apache.org>.
tstuber commented on issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960#issuecomment-750064745


   I am going to close this issue. 
   * Not having pojos generated on the base of an avro schema was my mistake. That is not an issue related to camel-quarkus
   * The issue with the infinite re-connection loop (that I faced during deserialisation due to wrong config) is already addressed here: https://issues.apache.org/jira/browse/CAMEL-14980


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-quarkus] oscerd commented on issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960#issuecomment-750337645


   Yes, I think we should add some docs about the specific case.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-quarkus] tstuber closed issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry

Posted by GitBox <gi...@apache.org>.
tstuber closed issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-quarkus] ppalaga commented on issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry

Posted by GitBox <gi...@apache.org>.
ppalaga commented on issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960#issuecomment-750332505


   Thanks for the report and the thorough investigation, @tstuber and sorry for zero help from our side. I wonder how could we improve the UX for the folks interested in this use case in the future? I do not think there is anything Camel Quarkus specific on this use case, so an example or a piece of docs should hang on the main Camel. WDYT @oscerd and other Kafka experts? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-quarkus] tstuber commented on issue #1960: camel-quarkus-kafka: Reconnection issues when working with schema registry

Posted by GitBox <gi...@apache.org>.
tstuber commented on issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960#issuecomment-718059534


   I found a "workaround":
   * Manually add the avro scheme into the project (e.g. `resources/avro/event.avsc`)
   * Configure the `avro-maven-plugin` to generate the sources
   
   Once the generated sources are available, the code works as listed above. 
   
   I am not sure if this is working as expected, since the documentation is not very clear to me. Also, I did not find good examples for this case. I could contribute here, if someone could provide some inputs here. 
   
   Last but not least, I am not sure if this is actually a camel-quarkus issue. I did not test this behavior with another setup.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org