You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/10 22:36:00 UTC

[jira] [Commented] (KAFKA-6895) Schema Inferencing for JsonConverter

    [ https://issues.apache.org/jira/browse/KAFKA-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471242#comment-16471242 ] 

ASF GitHub Bot commented on KAFKA-6895:
---------------------------------------

natengall opened a new pull request #5001: KAFKA-6895: Schema Inferencing for JsonConverter
URL: https://github.com/apache/kafka/pull/5001
 
 
   Introduce a new configuration for the JsonConverter class called "schemas.infer.enable". When this flag is set to "false", the existing behavior is exhibited. When it's set to "true", infer the schema from the contents of the JSON record, and return that as part of the SchemaAndValue object for Sink Connectors.
   
   Author: Allen Tang <na...@gmail.com>
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Schema Inferencing for JsonConverter
> ------------------------------------
>
>                 Key: KAFKA-6895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6895
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Allen Tang
>            Priority: Minor
>
> Though there does exist a converter in the connect-json library called "JsonConverter", there are limitations as to the domain of JSON payloads this converter is compatible with on the Sink Connector side when serializing them into Kafka Connect datatypes; When reading byte arrays from Kafka, the JsonConverter expects its inputs to be a JSON envelope that contains the fields "schema" and "payload", otherwise it'll throw a DataException reporting:
>  ??JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.??
>  (when schemas.enable is true) or
>  ??JSON value converted to Kafka Connect must be in envelope containing schema??
>  (when schemas.enable is false)
>  For example, if your JSON payload looks something on the order of:
> { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 }
> This will not be compatible for Sink Connectors that require the schema for data ingest when mapping from Kafka Connect datatypes to, for example, JDBC datatypes. Rather, that data is expected to be structured like so:
> { "schema": \{ "type": "struct", "fields": [{ "type": "int32", "optional": true, "field": "c1" }, \{ "type": "string", "optional": true, "field": "c2" }, \{ "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, \{ "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": \{ "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
> The "schema" is a necessary component in order to dictate to the JsonConverter how to map the payload's JSON datatypes to Kafka Connect datatypes on the consumer side.
> Introduce a new configuration for the JsonConverter class called "schemas.infer.enable". When this flag is set to "false", the existing behavior is exhibited. When it's set to "true", infer the schema from the contents of the JSON record, and return that as part of the SchemaAndValue object for Sink Connectors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)