You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/03/17 08:18:45 UTC

[GitHub] [camel-kafka-connector] jonaswagner opened a new issue #1107: aws2 s3 source connector ClassCastException for value.conver

jonaswagner opened a new issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107


   Hi all,
   
   This issue relates to the conversation from the Camel Zulipat chat: https://camel.zulipchat.com/#narrow/stream/257303-camel-kafka-connector/topic/aws2.20s3.20source.20connector.20ClassCastException.20for.20value.2Econver
   
   As discussed with @luigidemasi, I get a ClassCastException, when I try to use the value converter property for my aws2 s3 source connector.
   
   This is the config I used:
   
   spec:
   class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
   tasksMax: 1
   config:
   camel.source.path.bucketNameOrArn: my-bucket-name
   camel.source.endpoint.autoCreateBucket: false
   camel.source.endpoint.region: my-region
   camel.source.endpoint.deleteAfterRead: true
   camel.source.endpoint.fileName: devl/shared/targetfile.csv
   camel.source.endpoint.accessKey: "*"
   camel.source.endpoint.secretKey: "*"
   camel.source.maxPollDuration: 10000
   camel.source.endpoint.delay: 10000
   camel.source.endpoint.autocloseBody: false
   #key.converter: org.apache.kafka.connect.storage.StringConverter
   #value.converter: org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter
   topics: test-name.targetfile.source.s3
   
   The exception I get is the following:
   
   2021-03-16 07:23:23,385 INFO WorkerSourceTask{id=test-name-source-s3-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-test-name-source-s3-0]
   2021-03-16 07:23:23,385 INFO WorkerSourceTask{id=test-name-source-s3-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-test-name-source-s3-0]
   2021-03-16 07:23:23,385 ERROR WorkerSourceTask{id=test-name-source-s3-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-test-name-source-s3-0]
   org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
   at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:295)
   at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:321)
   at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to com.amazonaws.services.s3.model.S3ObjectInputStream
   at org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter.fromConnectData(S3ObjectConverter.java:37)
   at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
   at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:295)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
   ... 11 more
   2021-03-16 07:23:23,385 ERROR WorkerSourceTask{id=test-name-source-s3-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-test-name-source-s3-0]
   
   My expectation would be that I can upload binaries to S3 and convert them into S3Objects for my Kafka Topic.
   
   Thank you for investigating this issue
   Jonas
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] jonaswagner closed issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
jonaswagner closed issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] luigidemasi edited a comment on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
luigidemasi edited a comment on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801487018


   @jonaswagner, @oscerd is right: in a source task, a converter is used to convert incoming data to a byte array using fromConnectData method from [Converter](https://github.com/apache/kafka/blob/9adfac280392da0837cfd8d582bc540951e94087/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L46) interface but you already have byte array in your body, that's the reason for the error.
   
   So you can use `org.apache.kafka.connect.converters.ByteArrayConverter` to have a raw byte array in your topic, or not using a converter at all and get a JSON with schema and in the payload the base64 representation of the byte array:
   
   ```json
   {
      "schema":{
         "type":"bytes",
         "optional":false
      },
      "payload":"/9j/4gIcSUNDX1BST0ZJTEUAAQEAAAIMbGNtcwIQAABtbnRyUkdCIFhZWiAH3AABABkAAwApADlhY3NwQVBQTAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLWxjbXMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAApkZXNjAAAA/AAAAF5jcHJ0AAABXAAAAAt3dHB0AAABaAAAABRia3B0AAABfAAAABRyWFlaAAABkAAAABRnWFlaAAABpAAAABRiWFlaAAABuAAAABRyVFJDAAABzAAAAEBnVFJDAAABzAAAAEBiVFJDAAABzAAAAEBkZXNjAAAAAAAAAANjMgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB0ZXh0AAAAAEZCAABYWVogAAAAAAAA9tYAAQAAAADTLVhZWiAAAAAAAAADFgAAAzMAAAKkWFlaIAAAAAAAAG+iAAA49QAAA5BYWVogAAAAAAAAYpkAALeFAAAY2lhZWiAAAAAAAAAkoAAAD4QAALbPY3Vyd....."
   }
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801449592


   Try with the bytearray converter.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] jonaswagner commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
jonaswagner commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801743463


   Resolved. The main problem was the usage of an old connector version (0.5.0 instead of the newer 0.8.0) and the missunderstanding of the documentation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801719639


   Yes, please :-)
   
   It's true that the converter is supported out of the box. It is here: https://github.com/apache/camel-kafka-connector/blob/master/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/converters/S3ObjectConverter.java


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] jonaswagner commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
jonaswagner commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801717479


   You are right, I was blindly reading the documentation (see https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-aws2-s3-kafka-source-connector.html), which says that the connector supports the S3 Converter out of the box. 
   
   I was also confused by the sample configuration of the aws-s3-kafka-source-connector (not aws2) example, which contains a StringConverter and the S3ObjectConverter (see https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-aws-s3-kafka-source-connector.html#_examples)
   
   Now that I have everything working I share my final configuration, which is:
   
   spec:
   class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
   tasksMax: 1
   config:
   camel.source.path.bucketNameOrArn: my-bucket-name
   camel.source.endpoint.autoCreateBucket: false
   camel.source.endpoint.region: my-region
   camel.source.endpoint.deleteAfterRead: true
   camel.source.endpoint.fileName: devl/shared/targetfile.csv
   camel.source.endpoint.accessKey: ""
   camel.source.endpoint.secretKey: ""
   camel.source.maxPollDuration: 10000
   camel.source.endpoint.delay: 10000
   camel.source.endpoint.autocloseBody: false
   topics: test-name.targetfile.source.s3
   
   Thank you very much for helping me :)
   
   Should I close this issue now?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] luigidemasi edited a comment on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
luigidemasi edited a comment on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801487018


   @jonaswagner, @oscerd is right: in a source task, a converter is used to convert incoming data to a byte array using fromConnectData method from [Converter](https://github.com/apache/kafka/blob/9adfac280392da0837cfd8d582bc540951e94087/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L46) interface but you already have byte array in your body, that's the reason for the error.
   
   So you can use `org.apache.kafka.connect.converters.ByteArrayConverter` to have a raw byte array in your topic, or not using a converter at all and get a json with schema:
   
   ```json
   {
      "schema":{
         "type":"bytes",
         "optional":false
      },
      "payload":"/9j/4gIcSUNDX1BST0ZJTEUAAQEAAAIMbGNtcwIQAABtbnRyUkdCIFhZWiAH3AABABkAAwApADlhY3NwQVBQTAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLWxjbXMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAApkZXNjAAAA/AAAAF5jcHJ0AAABXAAAAAt3dHB0AAABaAAAABRia3B0AAABfAAAABRyWFlaAAABkAAAABRnWFlaAAABpAAAABRiWFlaAAABuAAAABRyVFJDAAABzAAAAEBnVFJDAAABzAAAAEBiVFJDAAABzAAAAEBkZXNjAAAAAAAAAANjMgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB0ZXh0AAAAAEZCAABYWVogAAAAAAAA9tYAAQAAAADTLVhZWiAAAAAAAAADFgAAAzMAAAKkWFlaIAAAAAAAAG+iAAA49QAAA5BYWVogAAAAAAAAYpkAALeFAAAY2lhZWiAAAAAAAAAkoAAAD4QAALbPY3Vyd....."
   }
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] luigidemasi commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
luigidemasi commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801487018


   @jonaswagner, @oscerd is right: in a source task, a converter is used to convert incoming data to a byte array using fromConnectData method from [Converter](https://github.com/apache/kafka/blob/9adfac280392da0837cfd8d582bc540951e94087/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L46) interface but you already have byte array in your body, that's the reason for the error.
   
   So you can use `org.apache.kafka.connect.converters.ByteArrayConverter` to have a raw byte array in your topic, or not using a converter at all and get a json with schema:
   
   ```json
   {"schema":{"type":"bytes","optional":false},"payload":"/9j/4gIcSUNDX1BST0ZJTEUAAQEAAAIMbGNtcwIQAABtbnRyUkdCIFhZWiAH3AABABkAAwApADlhY3NwQVBQTAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLWxjbXMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAApkZXNjAAAA/AAAAF5jcHJ0AAABXAAAAAt3dHB0AAABaAAAABRia3B0AAABfAAAABRyWFlaAAABkAAAABRnWFlaAAABpAAAABRiWFlaAAABuAAAABRyVFJDAAABzAAAAEBnVFJDAAABzAAAAEBiVFJDAAABzAAAAEBkZXNjAAAAAAAAAANjMgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB0ZXh0AAAAAEZCAABYWVogAAAAAAAA9tYAAQAAAADTLVhZWiAAAAAAAAADFgAAAzMAAAKkWFlaIAAAAAAAAG+iAAA49QAAA5BYWVogAAAAAAAAYpkAALeFAAAY2lhZWiAAAAAAAAAkoAAAD4QAALbPY3Vyd....."}
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-800892989


   What is the connector version?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-800956436


   We reworked the underline component please try with 0.7.3 or 0.8.0


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] jonaswagner commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
jonaswagner commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-801244602


   Ok, I tried the same config as above with version 0.8.0. Now I get a different exception:
   
   org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
                 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
                 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
                 at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:295)
                 at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:321)
                 at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
                 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
                 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
                 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                 at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.ClassCastException: [B cannot be cast to software.amazon.awssdk.core.ResponseInputStream
     at org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter.fromConnectData(S3ObjectConverter.java:37)
     at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
     at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:295)
     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
     ... 11 more
   
   It seems to me, that the S3ObjectConverter does not work. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] jonaswagner commented on issue #1107: aws2 s3 source connector ClassCastException for value.conver

Posted by GitBox <gi...@apache.org>.
jonaswagner commented on issue #1107:
URL: https://github.com/apache/camel-kafka-connector/issues/1107#issuecomment-800953711


   We are currently using Version 0.5.0


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org