You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by medcv <gi...@git.apache.org> on 2018/07/04 23:15:23 UTC

[GitHub] flink pull request #6259: [FLINK-9679] Implement AvroSerializationSchema

GitHub user medcv opened a pull request:

    https://github.com/apache/flink/pull/6259

    [FLINK-9679] Implement AvroSerializationSchema

    ## What is the purpose of the change
    
    Provides implementation of AvroSerializationSchema that write records serialized as avro and also provides version that uses Confluent Schema Registry to write the record.
    
    This is following AvroDESerializationSchema implementation patterns to have a consistent code base for Ser/Des.
    
    ## Brief change log
    
    - Implemented AvroSerializationSchema / RegistryAvroSerializationSchema / ConfluentRegistryAvroSerializationSchema
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (yes)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/medcv/flink FLINK-9679

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6259.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6259
    
----
commit 975d1b3a774c11acf01ff2533876d32afbb670b0
Author: Yadan.JS <y_...@...>
Date:   2018-06-28T16:39:30Z

    [FLINK-9679] Implement AvroSerializationSchema

----


---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    @medcv the new commit does not address any of my previous comments or I don't understand something.


---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    @dawidwys  Thanks!
    
    As far as I dog on Confluent code, their api needs `subject` to retrieve the Schema Id and version and it should be provided by consumer. 
    
    https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClient.java#L30
    
    Purpose of new commit is to address your first comments by removing `topic` name in the serialization constructor and replace it with `subject`. So this way serializer doesn't need to know about the `topic` name.
    
    If you still see issues with this approach I would appreciate it if you help me to find a better solution.
    
    
     



---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    Actually I've suggested to move the whole writing schema logic into the coder. So the coder should have a method like `writeSchema(..., OutputStream out)`. 
    
    Substituting previous `topic + "value"` with `subject` does not change anything. It simply renames a lookup key. The whole problem is that it is static and does not take into account the actual topic the message will be written to. Unfortunately I am afraid it would require some changes in the `FlinkKafkaProducer` itself.


---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    @tillrohrmann I implemented `ConfluentRegistryAvroSerializationSchema` in this PR. I would appreciate if you review.



---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    @dawidwys Thanks a lot for the review.
    I will start look into both problems that you've pointed out.
    Second point would be easy to fix but for the first one I need do some research and see the feasibility of it.


---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    Hi @medcv ,
    First of all thanks for the contribution. There is a couple of problems with this implementation though:
    * ConfluentSchemaRegistry follows the idea that schemas are grouped by topic name. Providing the topic name in ctor of `SerializationSchema` will break this assumption when used with topic pattern. I think we should somehow enable passing schema from `KafkaProducer` to `SerializationSchema`, but it would require far more changes.
    * You assume in `RegistryAvroSerializationSchema` that schema is always represented with a single  id(`int`). That is true only for the ConfluentSchemaRegistry, e.g. HortonworksSchemaRegistry uses metadata-id(`long`) and version id(`int`). The idea of `SchemaCoder` was to abstract away all "vendor" specific features.


---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    @dawidwys Thanks for your suggestions.
    New commit extend `SchemaCoder` with `writeSchema` method that helps to move the writing schema logic away from `AvroSerializationSchema` as you suggested.  
    
    Totally agree with you that having dynamic `subject` variables make the implementations more generic but as this is `Confluent` specific implementations and this variable is only presented for `ConfluentRegistryAvroSerializationSchema`, I think a user of this method should be aware of how `Confluent` requires this variable when they setup their Kafka Producer and Schema Registry.  
    
    I am open to suggestions to fix the issue ( by changing `FlinkKafkaProducer`) if you still thinking this is a blocker for this PR.


---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    @dawidwys I update the PR, please review
    
    the usage would be like this
    ` ConfluentRegistryAvroSerializationSchema.forSpecific(User.class, subject,  schemaRegistryUrl)`
    as Confluent needs "subject"  to fetch the Schema info. Now `ConfluentRegistryAvroSerializationSchema` uses "subject" directly instated on `topic + "-value"`.


---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    @dawidwys 
    in last commit, I did extend `SchemaCoder` to have `getSchemaId` as you suggested.   


---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    @dawidwys For second issue I am looking at other Schema registries and trying to extend `SchemaCoder`


---

[GitHub] flink issue #6259: [FLINK-9679] Implement AvroSerializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on the issue:

    https://github.com/apache/flink/pull/6259
  
    @tillrohrmann Please review 


---