You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/25 07:52:52 UTC

[GitHub] [flink] dawidwys commented on pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

dawidwys commented on pull request #12056:
URL: https://github.com/apache/flink/pull/12056#issuecomment-633433806


   Hey,
   Thanks for working on this. I was wondering if we could/should replace the `RMQDeserializedMessage` with an emitter parameter pattern (https://www.morling.dev/blog/emitter-parameter-pattern-for-flexible-spis/).
   
   If we changed the `org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchema#processMessage` to 
   
   ```
   void processMessage(
       Envelope envelope, 
       AMQP.BasicProperties properties, 
       byte[] body,
       RMQCollector<T> collector
   ) throws IOException;
   
   interface RMQCollector<T> {
       void collect(T record);
       void setCorrelationId(String correlationId); // throw exception if called multiple times
   }
   ```
   
   I think we could wrap the `DeserializationSchema` directly in that interface (similar as in other connectors).
   
   The biggest benefit is that we do not need to buffer the elements in arrays, thus we can create less objects on the hot path. There is the downside that the contract on `setCorrelationId`, that it is called once is not strictly enforced, but I think it should be fine here.
   
   The remaining question is if we want to support the `isEndOfStream` in the new schema. If we do not and go with the wrapper approach, we would have to find a workaround for it. At the same time this would be a regression compared to the current schema. Even if that method has a couple of problems on its own.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org