You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/03/17 08:18:45 UTC
[GitHub] [camel-kafka-connector] jonaswagner opened a new issue #1107: aws2 s3 source connector ClassCastException for value.conver
jonaswagner opened a new issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107
Hi all,
This issue relates to the conversation from the Camel Zulipat chat: https://camel.zulipchat.com/#narrow/stream/257303-camel-kafka-connector/topic/aws2.20s3.20source.20connector.20ClassCastException.20for.20value.2Econver
As discussed with @luigidemasi, I get a ClassCastException, when I try to use the value converter property for my aws2 s3 source connector.
This is the config I used:
spec:
class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
tasksMax: 1
config:
camel.source.path.bucketNameOrArn: my-bucket-name
camel.source.endpoint.autoCreateBucket: false
camel.source.endpoint.region: my-region
camel.source.endpoint.deleteAfterRead: true
camel.source.endpoint.fileName: devl/shared/targetfile.csv
camel.source.endpoint.accessKey: "*"
camel.source.endpoint.secretKey: "*"
camel.source.maxPollDuration: 10000
camel.source.endpoint.delay: 10000
camel.source.endpoint.autocloseBody: false
#key.converter: org.apache.kafka.connect.storage.StringConverter
#value.converter: org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter
topics: test-name.targetfile.source.s3
The exception I get is the following:
2021-03-16 07:23:23,385 INFO WorkerSourceTask{id=test-name-source-s3-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-test-name-source-s3-0]
2021-03-16 07:23:23,385 INFO WorkerSourceTask{id=test-name-source-s3-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-test-name-source-s3-0]
2021-03-16 07:23:23,385 ERROR WorkerSourceTask{id=test-name-source-s3-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-test-name-source-s3-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:295)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to com.amazonaws.services.s3.model.S3ObjectInputStream
at org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter.fromConnectData(S3ObjectConverter.java:37)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:295)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
2021-03-16 07:23:23,385 ERROR WorkerSourceTask{id=test-name-source-s3-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-test-name-source-s3-0]
My expectation would be that I can upload binaries to S3 and convert them into S3Objects for my Kafka Topic.
Thank you for investigating this issue
Jonas
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] jonaswagner closed issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
jonaswagner closed issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] luigidemasi edited a comment on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
luigidemasi edited a comment on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801487018
@jonaswagner, @oscerd is right: in a source task, a converter is used to convert incoming data to a byte array using fromConnectData method from [Converter](https://github.com/apache/kafka/blob/9adfac280392da0837cfd8d582bc540951e94087/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L46) interface but you already have byte array in your body, that's the reason for the error.
So you can use `org.apache.kafka.connect.converters.ByteArrayConverter` to have a raw byte array in your topic, or not using a converter at all and get a JSON with schema and in the payload the base64 representation of the byte array:
```json
{
"schema":{
"type":"bytes",
"optional":false
},
"payload":"/9j/4gIcSUNDX1BST0ZJTEUAAQEAAAIMbGNtcwIQAABtbnRyUkdCIFhZWiAH3AABABkAAwApADlhY3NwQVBQTAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLWxjbXMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAApkZXNjAAAA/AAAAF5jcHJ0AAABXAAAAAt3dHB0AAABaAAAABRia3B0AAABfAAAABRyWFlaAAABkAAAABRnWFlaAAABpAAAABRiWFlaAAABuAAAABRyVFJDAAABzAAAAEBnVFJDAAABzAAAAEBiVFJDAAABzAAAAEBkZXNjAAAAAAAAAANjMgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB0ZXh0AAAAAEZCAABYWVogAAAAAAAA9tYAAQAAAADTLVhZWiAAAAAAAAADFgAAAzMAAAKkWFlaIAAAAAAAAG+iAAA49QAAA5BYWVogAAAAAAAAYpkAALeFAAAY2lhZWiAAAAAAAAAkoAAAD4QAALbPY3Vyd....."
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] oscerd commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801449592
Try with the bytearray converter.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] jonaswagner commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
jonaswagner commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801743463
Resolved. The main problem was the usage of an old connector version (0.5.0 instead of the newer 0.8.0) and the missunderstanding of the documentation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] oscerd commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801719639
Yes, please :-)
It's true that the converter is supported out of the box. It is here: https://github.com/apache/camel-kafka-connector/blob/master/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/converters/S3ObjectConverter.java
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] jonaswagner commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
jonaswagner commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801717479
You are right, I was blindly reading the documentation (see https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-aws2-s3-kafka-source-connector.html), which says that the connector supports the S3 Converter out of the box.
I was also confused by the sample configuration of the aws-s3-kafka-source-connector (not aws2) example, which contains a StringConverter and the S3ObjectConverter (see https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-aws-s3-kafka-source-connector.html#_examples)
Now that I have everything working I share my final configuration, which is:
spec:
class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
tasksMax: 1
config:
camel.source.path.bucketNameOrArn: my-bucket-name
camel.source.endpoint.autoCreateBucket: false
camel.source.endpoint.region: my-region
camel.source.endpoint.deleteAfterRead: true
camel.source.endpoint.fileName: devl/shared/targetfile.csv
camel.source.endpoint.accessKey: ""
camel.source.endpoint.secretKey: ""
camel.source.maxPollDuration: 10000
camel.source.endpoint.delay: 10000
camel.source.endpoint.autocloseBody: false
topics: test-name.targetfile.source.s3
Thank you very much for helping me :)
Should I close this issue now?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] luigidemasi edited a comment on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
luigidemasi edited a comment on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801487018
@jonaswagner, @oscerd is right: in a source task, a converter is used to convert incoming data to a byte array using fromConnectData method from [Converter](https://github.com/apache/kafka/blob/9adfac280392da0837cfd8d582bc540951e94087/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L46) interface but you already have byte array in your body, that's the reason for the error.
So you can use `org.apache.kafka.connect.converters.ByteArrayConverter` to have a raw byte array in your topic, or not using a converter at all and get a json with schema:
```json
{
"schema":{
"type":"bytes",
"optional":false
},
"payload":"/9j/4gIcSUNDX1BST0ZJTEUAAQEAAAIMbGNtcwIQAABtbnRyUkdCIFhZWiAH3AABABkAAwApADlhY3NwQVBQTAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLWxjbXMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAApkZXNjAAAA/AAAAF5jcHJ0AAABXAAAAAt3dHB0AAABaAAAABRia3B0AAABfAAAABRyWFlaAAABkAAAABRnWFlaAAABpAAAABRiWFlaAAABuAAAABRyVFJDAAABzAAAAEBnVFJDAAABzAAAAEBiVFJDAAABzAAAAEBkZXNjAAAAAAAAAANjMgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB0ZXh0AAAAAEZCAABYWVogAAAAAAAA9tYAAQAAAADTLVhZWiAAAAAAAAADFgAAAzMAAAKkWFlaIAAAAAAAAG+iAAA49QAAA5BYWVogAAAAAAAAYpkAALeFAAAY2lhZWiAAAAAAAAAkoAAAD4QAALbPY3Vyd....."
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] luigidemasi commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
luigidemasi commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801487018
@jonaswagner, @oscerd is right: in a source task, a converter is used to convert incoming data to a byte array using fromConnectData method from [Converter](https://github.com/apache/kafka/blob/9adfac280392da0837cfd8d582bc540951e94087/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L46) interface but you already have byte array in your body, that's the reason for the error.
So you can use `org.apache.kafka.connect.converters.ByteArrayConverter` to have a raw byte array in your topic, or not using a converter at all and get a json with schema:
```json
{"schema":{"type":"bytes","optional":false},"payload":"/9j/4gIcSUNDX1BST0ZJTEUAAQEAAAIMbGNtcwIQAABtbnRyUkdCIFhZWiAH3AABABkAAwApADlhY3NwQVBQTAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLWxjbXMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAApkZXNjAAAA/AAAAF5jcHJ0AAABXAAAAAt3dHB0AAABaAAAABRia3B0AAABfAAAABRyWFlaAAABkAAAABRnWFlaAAABpAAAABRiWFlaAAABuAAAABRyVFJDAAABzAAAAEBnVFJDAAABzAAAAEBiVFJDAAABzAAAAEBkZXNjAAAAAAAAAANjMgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB0ZXh0AAAAAEZCAABYWVogAAAAAAAA9tYAAQAAAADTLVhZWiAAAAAAAAADFgAAAzMAAAKkWFlaIAAAAAAAAG+iAAA49QAAA5BYWVogAAAAAAAAYpkAALeFAAAY2lhZWiAAAAAAAAAkoAAAD4QAALbPY3Vyd....."}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] oscerd commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-800892989
What is the connector version?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] oscerd commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-800956436
We reworked the underline component please try with 0.7.3 or 0.8.0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] jonaswagner commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
jonaswagner commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801244602
Ok, I tried the same config as above with version 0.8.0. Now I get a different exception:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:295)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: [B cannot be cast to software.amazon.awssdk.core.ResponseInputStream
at org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter.fromConnectData(S3ObjectConverter.java:37)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:295)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
It seems to me, that the S3ObjectConverter does not work.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [camel-kafka-connector] jonaswagner commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver
Posted by GitBox <gi...@apache.org>.
jonaswagner commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-800953711
We are currently using Version 0.5.0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org