You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Christophe Bornet <bo...@gmail.com> on 2022/06/02 14:56:53 UTC

[DISCUSS] PIP-173 : Create a built-in Function implementing the most common basic transformations

Dear Pulsar community,

I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to create a
built-in Function implementing the most common basic transformations.

Let me know what you think.

Best regards,

Christophe

------

## Motivation

Currently, when users want to modify the data in Pulsar, they need to write
a Function.
For a lot of use cases, it would be handy for them to be able to use a
ready-made built-in Function that implements the most common basic
transformations like the ones available in [Kafka Connect’s SMTs](
https://docs.confluent.io/platform/current/connect/transforms/overview.html
).
This removes users the burden of writing the Function themselves, having to
understanding the perks of Pulsar Schemas, coding in a language that they
may not master (probably Java if they want to do advanced stuff), and they
benefit from battle-tested, maintained, performance-optimised code.

## Goal

This PIP is about providing a `TransformFunction` that executes a sequence
of basic transformations on the data.
The `TransformFunction` shall be easy to configure, launchable as a
built-in NAR.
The `TransformFunction` shall be able to apply a sequence of common
transformations in-memory so we don’t need to execute the
`TransformFunction` multiple times and read/write to a topic each time.

This PIP is not about appending such a Function to a Source or a Sink.
While this is the ultimate goal, so we can provide an experience similar to
Kafka SMTs and avoid a read/write to a topic, this work will be done in a
future PIP.
It is expected that the code written for this PIP will be reusable in this
future work.

## API Changes

This PIP will introduce a new `transform` module in `pulsar-function`
multi-module project.  The produced artifact will be a NAR of the
TransformFunction.

## Implementation

When it processes a record, `TransformFunction` will :

* Create a mutable structure `TransformContext` that contains

```java
@Data
public class TransformContext {
    private Context context;
    private Schema<?> keySchema;
    private Object keyObject;
    private boolean keyModified;
    private Schema<?> valueSchema;
    private Object valueObject;
    private boolean valueModified;
    private KeyValueEncodingType keyValueEncodingType;
    private String key;
    private Map<String, String> properties;
    private String outputTopic;
```

If the record is a `KeyValue`, the key and value schemas and object are
unpacked. Otherwise the `keySchema` and `keyObject` are null.

* Call in sequence the process method of a series of `TransformStep` on
this `TransformContext`

```java
public interface TransformStep {
    void process(TransformContext transformContext) throws Exception;
}
```

Each `TransformStep` can then modify the `TransformContext` as needed.

* Call the `send()` method of the `TransformContext` which will create the
message to send to the outputTopic, repacking the KeyValue if needed.

The `TransformFunction` will read its configuration as Json from
`userConfig` in the format:

```json
{
  "steps": [
    {
      "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
    },
    {
      "type": "merge-key-value"
    },
    {
      "type": "unwrap-key-value"
    },
    {
      "type": "cast", "schema-type": "STRING"
    }
  ]
}
```

Each step is defined by its `type` and uses its own arguments.

This example config applied on a KeyValue<AVRO, AVRO> input record with
value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
value={valueField1: value1, valueField2: value2, valueField3: value3}}`
will give after each step:
```
{key={keyField1: key1, keyField2: key2, keyField3: key3},
value={valueField1: value1, valueField2: value2, valueField3:
value3}}(KeyValue<AVRO, AVRO>)
           |
           | ”type": "drop-fields", "fields": "keyField1,keyField2”,
"part": "key”
           |
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2,
valueField3: value3}} (KeyValue<AVRO, AVRO>)
           |
           | "type": "merge-key-value"
           |
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
           |
           | "type": "unwrap-key-value"
           |
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3:
value3} (AVRO)
           |
           | "type": "cast", "schema-type": "STRING"
           |
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
"valueField3": "value3"} (STRING)
```

`TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
service file so it can be registered as a built-in function with name
`transform`.

## Reject Alternatives

None

Re: [DISCUSS] PIP-173 : Create a built-in Function implementing the most common basic transformations

Posted by Christophe Bornet <bo...@gmail.com>.
Hi Neng,

See the comment from Enrico in the PIP issue
<https://github.com/apache/pulsar/issues/15902>. We've put this in
"Rejected alternatives" for the reasons that:

   - it won't be easily available to all Pulsar users
   - it would be hard to guarantee compatibility with many Pulsar versions,
   and the Transformations will use many advanced features of Pulsar APIs

I agree with Enrico that we should maintain this function in the main repo.
I'll also add that:

   - This is a key feature that Kafka has
   <https://docs.confluent.io/platform/current/connect/transforms/overview.html>
and
   that is lacking in Pulsar
   - A lot of users are asking for it (especially CDC users)
   - The next step will be to be able to chain this function with a
   Sink/Source to have an experience similar to Kafka

I hope this answers your concerns about this PIP.

Best regards

Christophe

Le mer. 8 juin 2022 à 19:43, Neng Lu <fr...@gmail.com> a écrit :

> I would suggest first having some concrete implementations in a separate
> repo.
> After verifying its functionality and performance, then we can move into
> the main pulsar repo.
>
> On Fri, Jun 3, 2022 at 5:09 AM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > Overall I agree with the proposal.
> > I left some minor feedback on the issue
> >
> > Thank you
> >
> > Enrico
> >
> > Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet
> > <bo...@gmail.com> ha scritto:
> > >
> > > Dear Pulsar community,
> > >
> > > I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to
> > create a
> > > built-in Function implementing the most common basic transformations.
> > >
> > > Let me know what you think.
> > >
> > > Best regards,
> > >
> > > Christophe
> > >
> > > ------
> > >
> > > ## Motivation
> > >
> > > Currently, when users want to modify the data in Pulsar, they need to
> > write
> > > a Function.
> > > For a lot of use cases, it would be handy for them to be able to use a
> > > ready-made built-in Function that implements the most common basic
> > > transformations like the ones available in [Kafka Connect’s SMTs](
> > >
> >
> https://docs.confluent.io/platform/current/connect/transforms/overview.html
> > > ).
> > > This removes users the burden of writing the Function themselves,
> having
> > to
> > > understanding the perks of Pulsar Schemas, coding in a language that
> they
> > > may not master (probably Java if they want to do advanced stuff), and
> > they
> > > benefit from battle-tested, maintained, performance-optimised code.
> > >
> > > ## Goal
> > >
> > > This PIP is about providing a `TransformFunction` that executes a
> > sequence
> > > of basic transformations on the data.
> > > The `TransformFunction` shall be easy to configure, launchable as a
> > > built-in NAR.
> > > The `TransformFunction` shall be able to apply a sequence of common
> > > transformations in-memory so we don’t need to execute the
> > > `TransformFunction` multiple times and read/write to a topic each time.
> > >
> > > This PIP is not about appending such a Function to a Source or a Sink.
> > > While this is the ultimate goal, so we can provide an experience
> similar
> > to
> > > Kafka SMTs and avoid a read/write to a topic, this work will be done
> in a
> > > future PIP.
> > > It is expected that the code written for this PIP will be reusable in
> > this
> > > future work.
> > >
> > > ## API Changes
> > >
> > > This PIP will introduce a new `transform` module in `pulsar-function`
> > > multi-module project.  The produced artifact will be a NAR of the
> > > TransformFunction.
> > >
> > > ## Implementation
> > >
> > > When it processes a record, `TransformFunction` will :
> > >
> > > * Create a mutable structure `TransformContext` that contains
> > >
> > > ```java
> > > @Data
> > > public class TransformContext {
> > >     private Context context;
> > >     private Schema<?> keySchema;
> > >     private Object keyObject;
> > >     private boolean keyModified;
> > >     private Schema<?> valueSchema;
> > >     private Object valueObject;
> > >     private boolean valueModified;
> > >     private KeyValueEncodingType keyValueEncodingType;
> > >     private String key;
> > >     private Map<String, String> properties;
> > >     private String outputTopic;
> > > ```
> > >
> > > If the record is a `KeyValue`, the key and value schemas and object are
> > > unpacked. Otherwise the `keySchema` and `keyObject` are null.
> > >
> > > * Call in sequence the process method of a series of `TransformStep` on
> > > this `TransformContext`
> > >
> > > ```java
> > > public interface TransformStep {
> > >     void process(TransformContext transformContext) throws Exception;
> > > }
> > > ```
> > >
> > > Each `TransformStep` can then modify the `TransformContext` as needed.
> > >
> > > * Call the `send()` method of the `TransformContext` which will create
> > the
> > > message to send to the outputTopic, repacking the KeyValue if needed.
> > >
> > > The `TransformFunction` will read its configuration as Json from
> > > `userConfig` in the format:
> > >
> > > ```json
> > > {
> > >   "steps": [
> > >     {
> > >       "type": "drop-fields", "fields": "keyField1,keyField2", "part":
> > "key"
> > >     },
> > >     {
> > >       "type": "merge-key-value"
> > >     },
> > >     {
> > >       "type": "unwrap-key-value"
> > >     },
> > >     {
> > >       "type": "cast", "schema-type": "STRING"
> > >     }
> > >   ]
> > > }
> > > ```
> > >
> > > Each step is defined by its `type` and uses its own arguments.
> > >
> > > This example config applied on a KeyValue<AVRO, AVRO> input record with
> > > value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
> > > value={valueField1: value1, valueField2: value2, valueField3: value3}}`
> > > will give after each step:
> > > ```
> > > {key={keyField1: key1, keyField2: key2, keyField3: key3},
> > > value={valueField1: value1, valueField2: value2, valueField3:
> > > value3}}(KeyValue<AVRO, AVRO>)
> > >            |
> > >            | ”type": "drop-fields", "fields": "keyField1,keyField2”,
> > > "part": "key”
> > >            |
> > > {key={keyField3: key3}, value={valueField1: value1, valueField2:
> value2,
> > > valueField3: value3}} (KeyValue<AVRO, AVRO>)
> > >            |
> > >            | "type": "merge-key-value"
> > >            |
> > > {key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
> > > valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
> > >            |
> > >            | "type": "unwrap-key-value"
> > >            |
> > > {keyField3: key3, valueField1: value1, valueField2: value2,
> valueField3:
> > > value3} (AVRO)
> > >            |
> > >            | "type": "cast", "schema-type": "STRING"
> > >            |
> > > {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
> > > "valueField3": "value3"} (STRING)
> > > ```
> > >
> > > `TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
> > > service file so it can be registered as a built-in function with name
> > > `transform`.
> > >
> > > ## Reject Alternatives
> > >
> > > None
> >
>
>
> --
> Best Regards,
> Neng
>

Re: [DISCUSS] PIP-173 : Create a built-in Function implementing the most common basic transformations

Posted by Neng Lu <fr...@gmail.com>.
I would suggest first having some concrete implementations in a separate
repo.
After verifying its functionality and performance, then we can move into
the main pulsar repo.

On Fri, Jun 3, 2022 at 5:09 AM Enrico Olivelli <eo...@gmail.com> wrote:

> Overall I agree with the proposal.
> I left some minor feedback on the issue
>
> Thank you
>
> Enrico
>
> Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet
> <bo...@gmail.com> ha scritto:
> >
> > Dear Pulsar community,
> >
> > I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to
> create a
> > built-in Function implementing the most common basic transformations.
> >
> > Let me know what you think.
> >
> > Best regards,
> >
> > Christophe
> >
> > ------
> >
> > ## Motivation
> >
> > Currently, when users want to modify the data in Pulsar, they need to
> write
> > a Function.
> > For a lot of use cases, it would be handy for them to be able to use a
> > ready-made built-in Function that implements the most common basic
> > transformations like the ones available in [Kafka Connect’s SMTs](
> >
> https://docs.confluent.io/platform/current/connect/transforms/overview.html
> > ).
> > This removes users the burden of writing the Function themselves, having
> to
> > understanding the perks of Pulsar Schemas, coding in a language that they
> > may not master (probably Java if they want to do advanced stuff), and
> they
> > benefit from battle-tested, maintained, performance-optimised code.
> >
> > ## Goal
> >
> > This PIP is about providing a `TransformFunction` that executes a
> sequence
> > of basic transformations on the data.
> > The `TransformFunction` shall be easy to configure, launchable as a
> > built-in NAR.
> > The `TransformFunction` shall be able to apply a sequence of common
> > transformations in-memory so we don’t need to execute the
> > `TransformFunction` multiple times and read/write to a topic each time.
> >
> > This PIP is not about appending such a Function to a Source or a Sink.
> > While this is the ultimate goal, so we can provide an experience similar
> to
> > Kafka SMTs and avoid a read/write to a topic, this work will be done in a
> > future PIP.
> > It is expected that the code written for this PIP will be reusable in
> this
> > future work.
> >
> > ## API Changes
> >
> > This PIP will introduce a new `transform` module in `pulsar-function`
> > multi-module project.  The produced artifact will be a NAR of the
> > TransformFunction.
> >
> > ## Implementation
> >
> > When it processes a record, `TransformFunction` will :
> >
> > * Create a mutable structure `TransformContext` that contains
> >
> > ```java
> > @Data
> > public class TransformContext {
> >     private Context context;
> >     private Schema<?> keySchema;
> >     private Object keyObject;
> >     private boolean keyModified;
> >     private Schema<?> valueSchema;
> >     private Object valueObject;
> >     private boolean valueModified;
> >     private KeyValueEncodingType keyValueEncodingType;
> >     private String key;
> >     private Map<String, String> properties;
> >     private String outputTopic;
> > ```
> >
> > If the record is a `KeyValue`, the key and value schemas and object are
> > unpacked. Otherwise the `keySchema` and `keyObject` are null.
> >
> > * Call in sequence the process method of a series of `TransformStep` on
> > this `TransformContext`
> >
> > ```java
> > public interface TransformStep {
> >     void process(TransformContext transformContext) throws Exception;
> > }
> > ```
> >
> > Each `TransformStep` can then modify the `TransformContext` as needed.
> >
> > * Call the `send()` method of the `TransformContext` which will create
> the
> > message to send to the outputTopic, repacking the KeyValue if needed.
> >
> > The `TransformFunction` will read its configuration as Json from
> > `userConfig` in the format:
> >
> > ```json
> > {
> >   "steps": [
> >     {
> >       "type": "drop-fields", "fields": "keyField1,keyField2", "part":
> "key"
> >     },
> >     {
> >       "type": "merge-key-value"
> >     },
> >     {
> >       "type": "unwrap-key-value"
> >     },
> >     {
> >       "type": "cast", "schema-type": "STRING"
> >     }
> >   ]
> > }
> > ```
> >
> > Each step is defined by its `type` and uses its own arguments.
> >
> > This example config applied on a KeyValue<AVRO, AVRO> input record with
> > value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
> > value={valueField1: value1, valueField2: value2, valueField3: value3}}`
> > will give after each step:
> > ```
> > {key={keyField1: key1, keyField2: key2, keyField3: key3},
> > value={valueField1: value1, valueField2: value2, valueField3:
> > value3}}(KeyValue<AVRO, AVRO>)
> >            |
> >            | ”type": "drop-fields", "fields": "keyField1,keyField2”,
> > "part": "key”
> >            |
> > {key={keyField3: key3}, value={valueField1: value1, valueField2: value2,
> > valueField3: value3}} (KeyValue<AVRO, AVRO>)
> >            |
> >            | "type": "merge-key-value"
> >            |
> > {key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
> > valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
> >            |
> >            | "type": "unwrap-key-value"
> >            |
> > {keyField3: key3, valueField1: value1, valueField2: value2, valueField3:
> > value3} (AVRO)
> >            |
> >            | "type": "cast", "schema-type": "STRING"
> >            |
> > {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
> > "valueField3": "value3"} (STRING)
> > ```
> >
> > `TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
> > service file so it can be registered as a built-in function with name
> > `transform`.
> >
> > ## Reject Alternatives
> >
> > None
>


-- 
Best Regards,
Neng

Re: [DISCUSS] PIP-173 : Create a built-in Function implementing the most common basic transformations

Posted by Enrico Olivelli <eo...@gmail.com>.
Overall I agree with the proposal.
I left some minor feedback on the issue

Thank you

Enrico

Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet
<bo...@gmail.com> ha scritto:
>
> Dear Pulsar community,
>
> I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to create a
> built-in Function implementing the most common basic transformations.
>
> Let me know what you think.
>
> Best regards,
>
> Christophe
>
> ------
>
> ## Motivation
>
> Currently, when users want to modify the data in Pulsar, they need to write
> a Function.
> For a lot of use cases, it would be handy for them to be able to use a
> ready-made built-in Function that implements the most common basic
> transformations like the ones available in [Kafka Connect’s SMTs](
> https://docs.confluent.io/platform/current/connect/transforms/overview.html
> ).
> This removes users the burden of writing the Function themselves, having to
> understanding the perks of Pulsar Schemas, coding in a language that they
> may not master (probably Java if they want to do advanced stuff), and they
> benefit from battle-tested, maintained, performance-optimised code.
>
> ## Goal
>
> This PIP is about providing a `TransformFunction` that executes a sequence
> of basic transformations on the data.
> The `TransformFunction` shall be easy to configure, launchable as a
> built-in NAR.
> The `TransformFunction` shall be able to apply a sequence of common
> transformations in-memory so we don’t need to execute the
> `TransformFunction` multiple times and read/write to a topic each time.
>
> This PIP is not about appending such a Function to a Source or a Sink.
> While this is the ultimate goal, so we can provide an experience similar to
> Kafka SMTs and avoid a read/write to a topic, this work will be done in a
> future PIP.
> It is expected that the code written for this PIP will be reusable in this
> future work.
>
> ## API Changes
>
> This PIP will introduce a new `transform` module in `pulsar-function`
> multi-module project.  The produced artifact will be a NAR of the
> TransformFunction.
>
> ## Implementation
>
> When it processes a record, `TransformFunction` will :
>
> * Create a mutable structure `TransformContext` that contains
>
> ```java
> @Data
> public class TransformContext {
>     private Context context;
>     private Schema<?> keySchema;
>     private Object keyObject;
>     private boolean keyModified;
>     private Schema<?> valueSchema;
>     private Object valueObject;
>     private boolean valueModified;
>     private KeyValueEncodingType keyValueEncodingType;
>     private String key;
>     private Map<String, String> properties;
>     private String outputTopic;
> ```
>
> If the record is a `KeyValue`, the key and value schemas and object are
> unpacked. Otherwise the `keySchema` and `keyObject` are null.
>
> * Call in sequence the process method of a series of `TransformStep` on
> this `TransformContext`
>
> ```java
> public interface TransformStep {
>     void process(TransformContext transformContext) throws Exception;
> }
> ```
>
> Each `TransformStep` can then modify the `TransformContext` as needed.
>
> * Call the `send()` method of the `TransformContext` which will create the
> message to send to the outputTopic, repacking the KeyValue if needed.
>
> The `TransformFunction` will read its configuration as Json from
> `userConfig` in the format:
>
> ```json
> {
>   "steps": [
>     {
>       "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
>     },
>     {
>       "type": "merge-key-value"
>     },
>     {
>       "type": "unwrap-key-value"
>     },
>     {
>       "type": "cast", "schema-type": "STRING"
>     }
>   ]
> }
> ```
>
> Each step is defined by its `type` and uses its own arguments.
>
> This example config applied on a KeyValue<AVRO, AVRO> input record with
> value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
> value={valueField1: value1, valueField2: value2, valueField3: value3}}`
> will give after each step:
> ```
> {key={keyField1: key1, keyField2: key2, keyField3: key3},
> value={valueField1: value1, valueField2: value2, valueField3:
> value3}}(KeyValue<AVRO, AVRO>)
>            |
>            | ”type": "drop-fields", "fields": "keyField1,keyField2”,
> "part": "key”
>            |
> {key={keyField3: key3}, value={valueField1: value1, valueField2: value2,
> valueField3: value3}} (KeyValue<AVRO, AVRO>)
>            |
>            | "type": "merge-key-value"
>            |
> {key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
> valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
>            |
>            | "type": "unwrap-key-value"
>            |
> {keyField3: key3, valueField1: value1, valueField2: value2, valueField3:
> value3} (AVRO)
>            |
>            | "type": "cast", "schema-type": "STRING"
>            |
> {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
> "valueField3": "value3"} (STRING)
> ```
>
> `TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
> service file so it can be registered as a built-in function with name
> `transform`.
>
> ## Reject Alternatives
>
> None