You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Allen Tang (JIRA)" <ji...@apache.org> on 2018/05/10 21:52:00 UTC

[jira] [Created] (KAFKA-6895) Schema Inferencing for JsonConverter

Allen Tang created KAFKA-6895:
---------------------------------

             Summary: Schema Inferencing for JsonConverter
                 Key: KAFKA-6895
                 URL: https://issues.apache.org/jira/browse/KAFKA-6895
             Project: Kafka
          Issue Type: New Feature
          Components: KafkaConnect
            Reporter: Allen Tang


Though there does exist a converter in the connect-json library called "JsonConverter", there are limitations as to the domain of JSON payloads this converter is compatible with on the Sink Connector side when serializing them into Kafka Connect datatypes; When reading byte arrays from Kafka, the JsonConverter expects its inputs to be a JSON envelope that contains the fields "schema" and "payload", otherwise it'll throw a DataException reporting:
??JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.??
(when schemas.enable is true) or
??JSON value converted to Kafka Connect must be in envelope containing schema??
(when schemas.enable is false)
For example, if your JSON payload looks something on the order of:
??{ "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 }??
This will not be compatible for Sink Connectors that require the schema for data ingest when mapping from Kafka Connect datatypes to, for example, JDBC datatypes. Rather, that data is expected to be structured like so:
??{ "schema": \{ "type": "struct", "fields": [{ "type": "int32", "optional": true, "field": "c1" }, \{ "type": "string", "optional": true, "field": "c2" }, \{ "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, \{ "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": \{ "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }??


The "schema" is a necessary component in order to dictate to the JsonConverter how to map the payload's JSON datatypes to Kafka Connect datatypes on the consumer side.

 

Introduce a new configuration for the JsonConverter class called "schemas.infer.enable". When this flag is set to "false", the existing behavior is exhibited. When it's set to "true", infer the schema from the contents of the JSON record, and return that as part of the SchemaAndValue object for Sink Connectors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)