You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/12/19 14:26:03 UTC
[GitHub] [camel-kafka-connector] gohanbg opened a new issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
gohanbg opened a new issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300
Hello,
Apologies if my question is not for here. I have a kafka topic, that I want to export to S3, therefore using the **CamelAws2s3SinkConnector**. It has a confluent schema registry. All works well, but when I except when encountering a deleted kafka message. Then I get the following exception:
```
2021-12-19 13:56:58,542 ERROR WorkerSinkTask{id=s3-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Exchange delivery has failed! (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-s3-sink-connector-0]
org.apache.kafka.connect.errors.ConnectException: Exchange delivery has failed!
at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:199)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.lang.Object on: Message. Exchange[2719B1AB7DB587D-0000000000000003]
at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:79)
at org.apache.camel.component.aws2.s3.AWS2S3Producer.processSingleOp(AWS2S3Producer.java:258)
at org.apache.camel.component.aws2.s3.AWS2S3Producer.process(AWS2S3Producer.java:100)
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
at org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197)
at org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:217)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
at org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:190)
at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:194)
... 11 more
```
This is my configuration:
```
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: s3-sink-connector
labels:
strimzi.io/cluster: aws-connect
spec:
class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
topics: my-topic
camel.sink.path.bucketNameOrArn: my-bucket-name
camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
camel.sink.maxPollDuration: 10000
transforms: tojson
transforms.tojson.type: org.apache.camel.kafkaconnector.transforms.SchemaAndStructToJsonTransform
transforms.tojson.converter.type: value
camel.component.aws2-s3.accessKey: aws-key
camel.component.aws2-s3.secretKey: aws-secret
camel.component.aws2-s3.region: region
```
What happens is the following:
1. The connector starts reading the topic from beginning
2. The first few messages are correctly transformed and uploaded to S3 (i can see them all appearing correct, with actual JSON and not an avro message)
3. We reach a kafka null record (i.e. deleted kafka record)
4. The above exception is thrown and I can't continue
I'm using **strimzi v0.26.1**, with **kafka v3.0.0**, confluent **schema registry v7.0.0** and **camel-aws2-s3-kafka-connector v0.11.0**
Regards
Mihail
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] gohanbg commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
Posted by GitBox <gi...@apache.org>.
gohanbg commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997673454
Thank you for the reply @oscerd . Just to be clear when you say component, do I need a different SMT in order for this to work (that is ok with `nul`l values), or something else? I.e. is the problem in `SchemaAndStructToJsonTransform`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] oscerd commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997607405
This is how the underline component work. The body is expected everytime
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] gohanbg removed a comment on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
Posted by GitBox <gi...@apache.org>.
gohanbg removed a comment on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997408748
I tried with the simple string converter instead of Avro converter
```
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: s3-sink-connector21
labels:
strimzi.io/cluster: aws-connect
spec:
class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
#value.converter: io.confluent.connect.avro.AvroConverter
#value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
topics: advertisements
camel.sink.path.bucketNameOrArn: cc-kafka-test-bucket
camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
camel.sink.maxPollDuration: 10000
transforms: tojson
transforms.tojson.type: org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms
camel.component.aws2-s3.accessKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
camel.component.aws2-s3.secretKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
camel.component.aws2-s3.region: eu-central-1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] gohanbg commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
Posted by GitBox <gi...@apache.org>.
gohanbg commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997408748
I tried with the simple string converter instead of Avro converter
```
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: s3-sink-connector21
labels:
strimzi.io/cluster: aws-connect
spec:
class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
#value.converter: io.confluent.connect.avro.AvroConverter
#value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
topics: advertisements
camel.sink.path.bucketNameOrArn: cc-kafka-test-bucket
camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
camel.sink.maxPollDuration: 10000
transforms: tojson
transforms.tojson.type: org.apache.camel.kafkaconnector.aws2s3.transformers.RecordToJSONTransforms
camel.component.aws2-s3.accessKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
camel.component.aws2-s3.secretKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
camel.component.aws2-s3.region: eu-central-1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] gohanbg closed issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
Posted by GitBox <gi...@apache.org>.
gohanbg closed issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] gohanbg closed issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
Posted by GitBox <gi...@apache.org>.
gohanbg closed issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] gohanbg commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
Posted by GitBox <gi...@apache.org>.
gohanbg commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997408975
Apologies, closed by mistake
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] oscerd commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997673981
I mean the camel component used by the Kafka connector. Essentially you cannot send null value to the connector
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] gohanbg commented on issue #1300: CamelAws2s3SinkConnector not processing kafka null (deleted) messages
Posted by GitBox <gi...@apache.org>.
gohanbg commented on issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300#issuecomment-997675805
I understand. Thank you
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org