You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/13 01:32:00 UTC

[jira] [Commented] (KAFKA-5142) KIP-145 - Expose Record Headers in Kafka Connect

    [ https://issues.apache.org/jira/browse/KAFKA-5142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288584#comment-16288584 ] 

ASF GitHub Bot commented on KAFKA-5142:
---------------------------------------

GitHub user rhauch opened a pull request:

    https://github.com/apache/kafka/pull/4319

    [WIP] KAFKA-5142: Add Connect support for message headers (KIP-145)

    *NEW PROPOSAL FOR KIP-145... DO NOT MERGE*
    
    Changed the Connect API and runtime to support message headers as described in KIP-145.
    
    The new `Header` interface defines an immutable representation of a Kafka header (key-value pair) with support for the Connect value types and schemas. This interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types.
    
    The new `Headers` interface defines an ordered collection of headers and is used to track all headers associated with a `ConnectRecord` (and thus `SourceRecord` and `SinkRecord`). This does allow multiple headers with the same key. The `Headers` contains methods for adding, removing, finding, and modifying headers. Convenience methods allow connectors and transforms to easily use and modify the headers for a record.
    
    A new `HeaderConverter` interface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafka’s byte[] representation. A new `SimpleHeaderConverter` implementation has been added, and this serializes to strings and deserializes by inferring the schemas (`Struct` header values are serialized without the schemas, so they can only be deserialized as `Map` instances without a schema.) The `StringConverter`, `JsonConverter`, and `ByteArrayConverter` have all been extended to also be `HeaderConverter` implementations. Each connector can be configured with a different header converter, although by default the `SimpleHeaderConverter` is used to serialize header values as strings without schemas.
    
    Unit and integration tests are added for `ConnectHeader` and `ConnectHeaders`, the two implementation classes for headers. Additional test methods are added for the methods added to the `Converter` implementations. Finally, the `ConnectRecord` object is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes.
    
    ### Committer Checklist (excluded from commit message)
    - [ ] Verify design and implementation matches KIP-145
    - [ ] Verify test coverage and CI build status
    - [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rhauch/kafka kafka-5142-b

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/4319.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4319
    
----
commit 1c35692da19f3c8c92ce60946a69f576878b958a
Author: Randall Hauch <rh...@gmail.com>
Date:   2017-12-05T17:05:00Z

    KAFKA-5142: Add message headers to Connect API (KIP-145)
    
    Changed the Connect API to add message headers as described in KIP-145.
    
    The new `Header` interface defines an immutable representation of a Kafka header (name-value pair) with support for the Connect value types and schemas. Kafka headers have a string name and a binary value, which doesn’t align well with Connect’s existing data and schema mechanisms. Thus, Connect’s `Header` interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types. And, as discussed below, a new `HeaderConverter` interface is added to define how the Kafka header binary values are converted to Connect data objects.
    
    The new `Headers` interface defines an ordered collection of headers and is used to track all headers associated with a `ConnectRecord`. Like the Kafka headers API, the Connect `Headers` interface allows storing multiple headers with the same key in an ordered list. The Connect `Headers` interface is mutable and has a number of methods that make it easy for connectors and transformations to add, modify, and remove headers from the record, and the interface is designed to allow chaining multiple mutating methods.
    
    The existing constructors and methods in `ConnectRecord`, `SinkRecord`, and `SourceRecord` are unchanged to maintain backward compatibility, and in these situations the records will contain an empty `Headers` object that connectors and transforms can modify. There is also an additional constructor that allows an existing `Headers` to be passed in. A new overloaded form of `newRecord` method was created to allow connectors and transforms to create a new record with an entirely new `Headers` object.
    
    A new `HeaderConverter` interface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafka’s byte[] representation.
    
    Unit and integration tests are added for `ConnectHeader` and `ConnectHeaders`, the two implementation classes for headers. The `ConnectRecord` object is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes. However, new unit tests were added for `SinkRecord` and `SourceRecord to verify the header behavior, including when the `newRecord` methods are called.

commit f398eba326d6c0cc8732770cb3bfc962f0453995
Author: Randall Hauch <rh...@gmail.com>
Date:   2017-12-13T01:27:26Z

    KAFKA-5142: Add message header converters to Connect API (KIP-145)
    
    This is the second commit for the public Connect API changes for KIP-145, and deals primarily with `HeaderConverter` implementations.
    
    Connect has three `Converter` implementations, `StringConverter`, `JsonConverter` and `ByteArrayConverter`. These were modified to also implement `HeaderConverter`, without changing any of the existing functionality.
    
    Like many of our pluggable components in Connect, the `HeaderConverter` interface extends `Configurable` that allows implementations to expose a `ConfigDef` that describes the supported configuration properties, and a `config` method that can be used to initialize the component with provided configuration properties. The `StringConverter`, `JsonConverter` and `ByteArrayConverter` were changed to support these methods in a backward compatible manner. There are now `StringConverterConfig` and `JsonConverterConfig` classes that define the `ConfigDef` for the implementations; the `ByteArrayConverter` has no configuration properties and doesn't need a config class.
    
    Note that the existing `Converter` interface has a special `config` signature with a parameter that sas whether the converter is being used for keys or values. This is different than the `Configurable.config` signature, so this commit adds new `ConverterConfig` abstract class that defines a `converter.type` property that can be used to set whether the converter is being used for keys, values, or headers. The existing `Converter` methods internally set this property based upon the supplied boolean parameter, so the default for `converter.type` can be `header`.

commit 14cf25a957ce1a7f0207f3fbdc9da5a30d5f3488
Author: Randall Hauch <rh...@gmail.com>
Date:   2017-12-13T01:28:44Z

    KAFKA-5142: Add message headers to Connect runtime (KIP-145)
    
    This is the third commit for KIP-145 and changes the Connect runtime to support headers. Each Connect worker now configures a `HeaderConverter` for each connector task, in the same way it creates key and value `Converter` instances. This is entirely backward compatible, so that existing worker and connector configurations will work without changes. By default, the worker will use the `SimpleHeaderConverter` to serialize header values as strings and to deserialize them by inferring the schemas.

----


> KIP-145 - Expose Record Headers in Kafka Connect
> ------------------------------------------------
>
>                 Key: KAFKA-5142
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5142
>             Project: Kafka
>          Issue Type: New Feature
>          Components: clients
>            Reporter: Michael Andre Pearce
>            Assignee: Michael Andre Pearce
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect
> As KIP-82 introduced Headers into the core Kafka Product, it would be advantageous to expose them in the Kafka Connect Framework.
> Connectors that replicate data between Kafka cluster or between other messaging products and Kafka would want to replicate the headers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)