You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/01/07 21:23:44 UTC

[GitHub] [camel-kafka-connector] ruchirvaninasdaq commented on issue #834: Question: S3 file get moved on failure also

ruchirvaninasdaq commented on issue #834:
URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756393630


   Adding Serializer code also if that helps: 
   ```
   public class S3ObjectAvroSerializer implements Serializer<ResponseInputStream<GetObjectResponse>> {
   
       private static final Logger LOG = LoggerFactory.getLogger(S3ObjectAvroSerializer.class);
       private Schema schema;
       private GenericRecordSerializer recordSerializer;
       private MessageFactory MessageFactory;
   
       public S3ObjectAvroSerializer(Schema schema){
           this.schema=schema;
           this.recordSerializer = new GenericRecordSerializer(this.schema);
           this.MessageFactory = new MessageFactory();
       }
   
       public S3ObjectAvroSerializer() throws IOException {
           Schema.Parser parser = new Schema.Parser();
           this.schema= parser.parse(getClass().getResourceAsStream("/avro/schema.avsc"));
           this.recordSerializer = new GenericRecordSerializer(this.schema);
           this.MessageFactory = new MessageFactory();
       }
   
       /**
        * Create a Kafka serializer for control schema messages.
        */
       public Serializer<GenericRecord> getSerializer() {
           return new GenericRecordSerializer(this.schema);
       }
   
       @Override
       public void configure(Map<String, ?> configs, boolean isKey) {
       }
   
       @Override
       public byte[] serialize(String topic, ResponseInputStream<GetObjectResponse> inputStream) {
           GenericRecord record = null;
           try{
              record = MessageFactory.parseMessage(inputStream);
           }
           catch (Exception e){
               LOG.error("Error in Serializer: "+ e );
               e.printStackTrace();
           }
           return recordSerializer.doSerialize(topic, record);
       }
   
       @Override
       public void close() {
   
       }
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org