You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/12/19 14:26:03 UTC

[GitHub] [camel-kafka-connector] gohanbg opened a new issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

gohanbg opened a new issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300


   Hello,
   
   Apologies if my question is not for here. I have a kafka topic, that I want to export to S3, therefore using the **CamelAws2s3SinkConnector**. It has a confluent schema registry. All works well, but when I except when encountering a deleted kafka message. Then I get the following exception:
   ```
   2021-12-19 13:56:58,542 ERROR WorkerSinkTask{id=s3-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Exchange delivery has failed! (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-s3-sink-connector-0]
   org.apache.kafka.connect.errors.ConnectException: Exchange delivery has failed!
   	at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:199)
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
   	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
   	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
   	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
   	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.lang.Object on: Message. Exchange[2719B1AB7DB587D-0000000000000003]
   	at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:79)
   	at org.apache.camel.component.aws2.s3.AWS2S3Producer.processSingleOp(AWS2S3Producer.java:258)
   	at org.apache.camel.component.aws2.s3.AWS2S3Producer.process(AWS2S3Producer.java:100)
   	at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
   	at org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197)
   	at org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
   	at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182)
   	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
   	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
   	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
   	at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
   	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
   	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
   	at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:217)
   	at org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
   	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
   	at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
   	at org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:190)
   	at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
   	at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
   	at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:194)
   	... 11 more
   ```
   
   This is my configuration:
   ```
   apiVersion: kafka.strimzi.io/v1beta2
   kind: KafkaConnector
   metadata:
     name: s3-sink-connector
     labels:
       strimzi.io/cluster: aws-connect
   spec:
     class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
     tasksMax: 1
     config:
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: io.confluent.connect.avro.AvroConverter
       value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
       topics: my-topic
       camel.sink.path.bucketNameOrArn: my-bucket-name
       camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
       camel.sink.maxPollDuration: 10000
       transforms: tojson
       transforms.tojson.type: org.apache.camel.kafkaconnector.transforms.SchemaAndStructToJsonTransform
       transforms.tojson.converter.type: value
       camel.component.aws2-s3.accessKey: aws-key
       camel.component.aws2-s3.secretKey: aws-secret
       camel.component.aws2-s3.region: region
   
   ```
   
   What happens is the following:
   1. The connector starts reading the topic from beginning
   2. The first few messages are correctly transformed and uploaded to S3 (i can see them all appearing correct, with actual JSON and not an avro message)
   3. We reach a kafka null record (i.e. deleted kafka record)
   4. The above exception is thrown and I can't continue
   
   I'm using **strimzi v0.26.1**, with **kafka v3.0.0**, confluent **schema registry v7.0.0** and **camel-aws2-s3-kafka-connector v0.11.0**
   
   Regards
   Mihail


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] gohanbg commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

Posted by GitBox <gi...@apache.org>.
gohanbg commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997673454


   Thank you for the reply @oscerd . Just to be clear when you say component, do I need a different SMT in order for this to work (that is ok with `nul`l values), or something else? I.e. is the problem in `SchemaAndStructToJsonTransform` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997607405


   This is how the underline component work. The body is expected everytime


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] gohanbg removed a comment on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

Posted by GitBox <gi...@apache.org>.
gohanbg removed a comment on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997408748


   I tried with the simple string converter instead of Avro converter
   
   ```
   apiVersion: kafka.strimzi.io/v1beta2
   kind: KafkaConnector
   metadata:
     name: s3-sink-connector21
     labels:
       strimzi.io/cluster: aws-connect
   spec:
     class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
     tasksMax: 1
     config:
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: org.apache.kafka.connect.storage.StringConverter
       #value.converter: io.confluent.connect.avro.AvroConverter
       #value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
       topics: advertisements
       camel.sink.path.bucketNameOrArn: cc-kafka-test-bucket
       camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
       camel.sink.maxPollDuration: 10000
       transforms: tojson
       transforms.tojson.type: org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms
       camel.component.aws2-s3.accessKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
       camel.component.aws2-s3.secretKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
       camel.component.aws2-s3.region: eu-central-1
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] gohanbg commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

Posted by GitBox <gi...@apache.org>.
gohanbg commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997408748


   I tried with the simple string converter instead of Avro converter
   
   ```
   apiVersion: kafka.strimzi.io/v1beta2
   kind: KafkaConnector
   metadata:
     name: s3-sink-connector21
     labels:
       strimzi.io/cluster: aws-connect
   spec:
     class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
     tasksMax: 1
     config:
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: org.apache.kafka.connect.storage.StringConverter
       #value.converter: io.confluent.connect.avro.AvroConverter
       #value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
       topics: advertisements
       camel.sink.path.bucketNameOrArn: cc-kafka-test-bucket
       camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
       camel.sink.maxPollDuration: 10000
       transforms: tojson
       transforms.tojson.type: org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms
       camel.component.aws2-s3.accessKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
       camel.component.aws2-s3.secretKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
       camel.component.aws2-s3.region: eu-central-1
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] gohanbg closed issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

Posted by GitBox <gi...@apache.org>.
gohanbg closed issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] gohanbg closed issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

Posted by GitBox <gi...@apache.org>.
gohanbg closed issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] gohanbg commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

Posted by GitBox <gi...@apache.org>.
gohanbg commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997408975


   Apologies, closed by mistake


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997673981


   I mean the camel component used by the Kafka connector. Essentially you cannot send null value to the connector 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] gohanbg commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages

Posted by GitBox <gi...@apache.org>.
gohanbg commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997675805


   I understand. Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org