You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/01/07 21:20:10 UTC

[GitHub] [camel-kafka-connector] ruchirvaninasdaq opened a new issue #834: Question: S3 file get moved on failure also

ruchirvaninasdaq opened a new issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834


   Hello, 
   
   I am using the `aws2-s3-kafka-source-connector` connector (https://github.com/apache/camel-kafka-connector/tree/camel-kafka-connector-0.7.x/connectors/camel-aws2-s3-kafka-connector) with the following configs.
   
   ```
   apiVersion: kafka.strimzi.io/v1alpha1
   kind: KafkaConnector
   metadata:
     name: name-connector
     labels:
       strimzi.io/cluster: benzinga-kafka-connect-cluster
   spec:
     tasksMax: 1
     class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
     config:
       client.id: client
       topics: topic
       connector.class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter
       camel.source.kafka.topic: topic
       camel.source.url: aws2-s3://source-bucket?useDefaultCredentialsProvider=true&moveAfterRead=true&destinationBucket=destination-bucket
       camel.source.maxPollDuration: 10
       camel.source.maxBatchPollSize: 1000
       camel.component.aws2-s3.includeBody: false
       camel.source.endpoint.useDefaultCredentialsProvider : true
       camel.component.aws2-s3.autocloseBody : true
   ```
   
   I have updated the S3objectConverter for my customization for searlizer. Code is as follows: 
   ```
   public class S3ObjectConverter implements Converter {
   
       private static final Logger LOG = LoggerFactory.getLogger(S3ObjectConverter.class);
       //private final S3ObjectSerializer serializer = new S3ObjectSerializer();
       private final S3ObjectAvroSerializer serializer;
   
       public S3ObjectConverter() throws IOException {
           serializer = new S3ObjectAvroSerializer();
       }
   
       @Override
       public void configure(Map<String, ?> configs, boolean isKey) {
       }
   
       @Override
       public byte[] fromConnectData(String topic, Schema schema, Object value) {
           return serializer.serialize(topic, (ResponseInputStream<GetObjectResponse>)value);
       }
   
       @Override
       public SchemaAndValue toConnectData(String arg0, byte[] arg1) {
           return null;
       }
   
   }
   ```
   
   This works as expected and the object gets serialize as expected and added to Kafka topic and files get moved to destination-bucket also. 
   
   I have problem with on failure cases: 
   When the object fail on seralization, even after that it moves to destinationbucket (I expect it to stay in source bucket), Is there any config am I using wrong? 
   
   Thank you. 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756784825






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756419315


   Thanks for the feedback. I will update my config. 
   
   I have seen that if Kafka-cluster went down it marked as exchange completed. What is a better way to handle the Kafka exception with Kafka connector? 
   
   Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq edited a comment on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq edited a comment on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756828139


   Yeah, I have been running this connector for 7 months, never seen issues but few weeks before we had something wrong with our Kafka cluster and it failed, so I saw this issue. 
   
   To regenarate this issue, I added Thread.sleep (100000) in Serializer class. I deleted the cluster and was able to regenerate it. ( I guess you must know better way to do it ;) )
   
   Please let me know if you have any suggestions/recommendations to handle such errors in the future.(Its very rare to happen but would like to handle it if its possible)
   
   Thank you!! 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756784825


   @oscerd : Any recommendations on how to handle Kafka exception with camel kafka connector? Please provide me guidence, I can try implementing on my own. Thanks 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756819644


   ``` 
   if the cluster went down, but camel already completed the exchange and moved to the destination bucket, there is no way of rollback the situation from a camel perspective
   ```
   Thank you for the information.
   Is this part of connector scop to handle this kind of rollback in the future? Or It's just me who is running into this problem?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756793954






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] valdar commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
valdar commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-847728909


   @ruchirvaninasdaq this should have been addressed by https://github.com/apache/camel-kafka-connector/issues/202 can you try with a recent version of the connector like `0.9.0` or the upcoming `0.10.0`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756394830


   I expected this behavior based on this: https://github.com/apache/camel-kafka-connector/blob/dd9ccfa5dc2a0740ef1ed636cf999da41e6bbbcb/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java#L270


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756393630


   Adding Serializer code also if that helps: 
   ```
   public class S3ObjectAvroSerializer implements Serializer<ResponseInputStream<GetObjectResponse>> {
   
       private static final Logger LOG = LoggerFactory.getLogger(S3ObjectAvroSerializer.class);
       private Schema schema;
       private GenericRecordSerializer recordSerializer;
       private MessageFactory MessageFactory;
   
       public S3ObjectAvroSerializer(Schema schema){
           this.schema=schema;
           this.recordSerializer = new GenericRecordSerializer(this.schema);
           this.MessageFactory = new MessageFactory();
       }
   
       public S3ObjectAvroSerializer() throws IOException {
           Schema.Parser parser = new Schema.Parser();
           this.schema= parser.parse(getClass().getResourceAsStream("/avro/schema.avsc"));
           this.recordSerializer = new GenericRecordSerializer(this.schema);
           this.MessageFactory = new MessageFactory();
       }
   
       /**
        * Create a Kafka serializer for control schema messages.
        */
       public Serializer<GenericRecord> getSerializer() {
           return new GenericRecordSerializer(this.schema);
       }
   
       @Override
       public void configure(Map<String, ?> configs, boolean isKey) {
       }
   
       @Override
       public byte[] serialize(String topic, ResponseInputStream<GetObjectResponse> inputStream) {
           GenericRecord record = null;
           try{
              record = MessageFactory.parseMessage(inputStream);
           }
           catch (Exception e){
               LOG.error("Error in Serializer: "+ e );
               e.printStackTrace();
           }
           return recordSerializer.doSerialize(topic, record);
       }
   
       @Override
       public void close() {
   
       }
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756793954


   If you want to still have the file in the source bucket you should avoid delete after read and move after read and use the idempotency support introduced in 0.7.. the error is related to broker, so it must be managed at kafka level, but I don't think it is easy to do.. if the cluster went down, but camel already completed the exchange and moved to the destination bucket, there is no way of rollback the situation from a camel perspective. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756412993


   The failure in serialization happens at kafka level, so it is expected to
   find the file in the destination bucket. In terms of pure camel the
   exchange is complete. The error should be managed at kafka level. Also you
   should use single options and not camel.source.url. just one or the other.
   Camel.source.url is not the usual suggested apptoach
   
   Il gio 7 gen 2021, 22:25 Ruchir Vani <no...@github.com> ha scritto:
   
   > I expected this behavior based on this:
   > https://github.com/apache/camel-kafka-connector/blob/dd9ccfa5dc2a0740ef1ed636cf999da41e6bbbcb/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/CamelAws2s3SourceConnectorConfig.java#L270
   >
   > —
   > You are receiving this because you are subscribed to this thread.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756394830>,
   > or unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/ABG6XV6UI5O7DIT3COKRZFTSYYRGNANCNFSM4VZRCRNQ>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756823432


   Well, if the error happens during camel routing we need to manage it a bit better with rollback, but for your case this won't change the final situation, if the cluster will go down after the message has been consumed but before reaching the kafka topic, you'll lose the message. This is true even if you handle the error through kafka connect options. We may try to reproduce this scenario, but it's not easy.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq edited a comment on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq edited a comment on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756828139


   Yeah, I have been running this connector for 7 months, never seen issues but few weeks before we had something wrong with our Kafka cluster and it failed, so I saw this issue. 
   
   To regenarate this issue, I added Thread.sleep (100000) in Serializer class. I deleted the cluster and was able to regenerate it. ( I guess you must know better way to do it ;) )
   
   Please let me know if you have any suggestions/recommendations to handle such errors in the future? (Its very rare to happen but would like to handle it if its possible)
   
   Thank you!! 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq edited a comment on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq edited a comment on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756828139






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq edited a comment on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq edited a comment on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756419315


   Thanks for the feedback. I will update my config. 
   
   I have seen that if Kafka-cluster went down it marked as exchange completed. What is a better way to handle the Kafka exception with Kafka connector? (My cluster was up, it was processing but when the cluster went down for first messages it marked as Exchange completed and later it started failing but for the first message, it didn't fail and it never went to kafka topic)
   
   Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] ruchirvaninasdaq commented on issue #834: Question: S3 file get moved on failure also

Posted by GitBox <gi...@apache.org>.
ruchirvaninasdaq commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756828139


   Yeah, I have been running this connector for 7 months, never seen issues but few weeks before we had something wrong with our Kafka cluster and it failed, so I saw this issue. 
   
   To regenarate this issue, I added Thread.sleep (100000) in Serializer class. I deleted the cluster and was able to regenerate it.
   
   Please let me know if you have any suggestions/recommendations to handle such errors in the future? (Its very rare to happen but would like to handle it if its possible)
   
   Thank you!! 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org