You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Christophe Bornet <bo...@gmail.com> on 2022/06/02 14:56:53 UTC
[DISCUSS] PIP-173 : Create a built-in Function implementing the most common basic transformations
Dear Pulsar community,
I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to create a
built-in Function implementing the most common basic transformations.
Let me know what you think.
Best regards,
Christophe
------
## Motivation
Currently, when users want to modify the data in Pulsar, they need to write
a Function.
For a lot of use cases, it would be handy for them to be able to use a
ready-made built-in Function that implements the most common basic
transformations like the ones available in [Kafka Connect’s SMTs](
https://docs.confluent.io/platform/current/connect/transforms/overview.html
).
This removes users the burden of writing the Function themselves, having to
understanding the perks of Pulsar Schemas, coding in a language that they
may not master (probably Java if they want to do advanced stuff), and they
benefit from battle-tested, maintained, performance-optimised code.
## Goal
This PIP is about providing a `TransformFunction` that executes a sequence
of basic transformations on the data.
The `TransformFunction` shall be easy to configure, launchable as a
built-in NAR.
The `TransformFunction` shall be able to apply a sequence of common
transformations in-memory so we don’t need to execute the
`TransformFunction` multiple times and read/write to a topic each time.
This PIP is not about appending such a Function to a Source or a Sink.
While this is the ultimate goal, so we can provide an experience similar to
Kafka SMTs and avoid a read/write to a topic, this work will be done in a
future PIP.
It is expected that the code written for this PIP will be reusable in this
future work.
## API Changes
This PIP will introduce a new `transform` module in `pulsar-function`
multi-module project. The produced artifact will be a NAR of the
TransformFunction.
## Implementation
When it processes a record, `TransformFunction` will :
* Create a mutable structure `TransformContext` that contains
```java
@Data
public class TransformContext {
private Context context;
private Schema<?> keySchema;
private Object keyObject;
private boolean keyModified;
private Schema<?> valueSchema;
private Object valueObject;
private boolean valueModified;
private KeyValueEncodingType keyValueEncodingType;
private String key;
private Map<String, String> properties;
private String outputTopic;
```
If the record is a `KeyValue`, the key and value schemas and object are
unpacked. Otherwise the `keySchema` and `keyObject` are null.
* Call in sequence the process method of a series of `TransformStep` on
this `TransformContext`
```java
public interface TransformStep {
void process(TransformContext transformContext) throws Exception;
}
```
Each `TransformStep` can then modify the `TransformContext` as needed.
* Call the `send()` method of the `TransformContext` which will create the
message to send to the outputTopic, repacking the KeyValue if needed.
The `TransformFunction` will read its configuration as Json from
`userConfig` in the format:
```json
{
"steps": [
{
"type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
},
{
"type": "merge-key-value"
},
{
"type": "unwrap-key-value"
},
{
"type": "cast", "schema-type": "STRING"
}
]
}
```
Each step is defined by its `type` and uses its own arguments.
This example config applied on a KeyValue<AVRO, AVRO> input record with
value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
value={valueField1: value1, valueField2: value2, valueField3: value3}}`
will give after each step:
```
{key={keyField1: key1, keyField2: key2, keyField3: key3},
value={valueField1: value1, valueField2: value2, valueField3:
value3}}(KeyValue<AVRO, AVRO>)
|
| ”type": "drop-fields", "fields": "keyField1,keyField2”,
"part": "key”
|
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2,
valueField3: value3}} (KeyValue<AVRO, AVRO>)
|
| "type": "merge-key-value"
|
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
|
| "type": "unwrap-key-value"
|
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3:
value3} (AVRO)
|
| "type": "cast", "schema-type": "STRING"
|
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
"valueField3": "value3"} (STRING)
```
`TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
service file so it can be registered as a built-in function with name
`transform`.
## Reject Alternatives
None
Re: [DISCUSS] PIP-173 : Create a built-in Function implementing the most common basic transformations
Posted by Christophe Bornet <bo...@gmail.com>.
Hi Neng,
See the comment from Enrico in the PIP issue
<https://github.com/apache/pulsar/issues/15902>. We've put this in
"Rejected alternatives" for the reasons that:
- it won't be easily available to all Pulsar users
- it would be hard to guarantee compatibility with many Pulsar versions,
and the Transformations will use many advanced features of Pulsar APIs
I agree with Enrico that we should maintain this function in the main repo.
I'll also add that:
- This is a key feature that Kafka has
<https://docs.confluent.io/platform/current/connect/transforms/overview.html>
and
that is lacking in Pulsar
- A lot of users are asking for it (especially CDC users)
- The next step will be to be able to chain this function with a
Sink/Source to have an experience similar to Kafka
I hope this answers your concerns about this PIP.
Best regards
Christophe
Le mer. 8 juin 2022 à 19:43, Neng Lu <fr...@gmail.com> a écrit :
> I would suggest first having some concrete implementations in a separate
> repo.
> After verifying its functionality and performance, then we can move into
> the main pulsar repo.
>
> On Fri, Jun 3, 2022 at 5:09 AM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > Overall I agree with the proposal.
> > I left some minor feedback on the issue
> >
> > Thank you
> >
> > Enrico
> >
> > Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet
> > <bo...@gmail.com> ha scritto:
> > >
> > > Dear Pulsar community,
> > >
> > > I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to
> > create a
> > > built-in Function implementing the most common basic transformations.
> > >
> > > Let me know what you think.
> > >
> > > Best regards,
> > >
> > > Christophe
> > >
> > > ------
> > >
> > > ## Motivation
> > >
> > > Currently, when users want to modify the data in Pulsar, they need to
> > write
> > > a Function.
> > > For a lot of use cases, it would be handy for them to be able to use a
> > > ready-made built-in Function that implements the most common basic
> > > transformations like the ones available in [Kafka Connect’s SMTs](
> > >
> >
> https://docs.confluent.io/platform/current/connect/transforms/overview.html
> > > ).
> > > This removes users the burden of writing the Function themselves,
> having
> > to
> > > understanding the perks of Pulsar Schemas, coding in a language that
> they
> > > may not master (probably Java if they want to do advanced stuff), and
> > they
> > > benefit from battle-tested, maintained, performance-optimised code.
> > >
> > > ## Goal
> > >
> > > This PIP is about providing a `TransformFunction` that executes a
> > sequence
> > > of basic transformations on the data.
> > > The `TransformFunction` shall be easy to configure, launchable as a
> > > built-in NAR.
> > > The `TransformFunction` shall be able to apply a sequence of common
> > > transformations in-memory so we don’t need to execute the
> > > `TransformFunction` multiple times and read/write to a topic each time.
> > >
> > > This PIP is not about appending such a Function to a Source or a Sink.
> > > While this is the ultimate goal, so we can provide an experience
> similar
> > to
> > > Kafka SMTs and avoid a read/write to a topic, this work will be done
> in a
> > > future PIP.
> > > It is expected that the code written for this PIP will be reusable in
> > this
> > > future work.
> > >
> > > ## API Changes
> > >
> > > This PIP will introduce a new `transform` module in `pulsar-function`
> > > multi-module project. The produced artifact will be a NAR of the
> > > TransformFunction.
> > >
> > > ## Implementation
> > >
> > > When it processes a record, `TransformFunction` will :
> > >
> > > * Create a mutable structure `TransformContext` that contains
> > >
> > > ```java
> > > @Data
> > > public class TransformContext {
> > > private Context context;
> > > private Schema<?> keySchema;
> > > private Object keyObject;
> > > private boolean keyModified;
> > > private Schema<?> valueSchema;
> > > private Object valueObject;
> > > private boolean valueModified;
> > > private KeyValueEncodingType keyValueEncodingType;
> > > private String key;
> > > private Map<String, String> properties;
> > > private String outputTopic;
> > > ```
> > >
> > > If the record is a `KeyValue`, the key and value schemas and object are
> > > unpacked. Otherwise the `keySchema` and `keyObject` are null.
> > >
> > > * Call in sequence the process method of a series of `TransformStep` on
> > > this `TransformContext`
> > >
> > > ```java
> > > public interface TransformStep {
> > > void process(TransformContext transformContext) throws Exception;
> > > }
> > > ```
> > >
> > > Each `TransformStep` can then modify the `TransformContext` as needed.
> > >
> > > * Call the `send()` method of the `TransformContext` which will create
> > the
> > > message to send to the outputTopic, repacking the KeyValue if needed.
> > >
> > > The `TransformFunction` will read its configuration as Json from
> > > `userConfig` in the format:
> > >
> > > ```json
> > > {
> > > "steps": [
> > > {
> > > "type": "drop-fields", "fields": "keyField1,keyField2", "part":
> > "key"
> > > },
> > > {
> > > "type": "merge-key-value"
> > > },
> > > {
> > > "type": "unwrap-key-value"
> > > },
> > > {
> > > "type": "cast", "schema-type": "STRING"
> > > }
> > > ]
> > > }
> > > ```
> > >
> > > Each step is defined by its `type` and uses its own arguments.
> > >
> > > This example config applied on a KeyValue<AVRO, AVRO> input record with
> > > value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
> > > value={valueField1: value1, valueField2: value2, valueField3: value3}}`
> > > will give after each step:
> > > ```
> > > {key={keyField1: key1, keyField2: key2, keyField3: key3},
> > > value={valueField1: value1, valueField2: value2, valueField3:
> > > value3}}(KeyValue<AVRO, AVRO>)
> > > |
> > > | ”type": "drop-fields", "fields": "keyField1,keyField2”,
> > > "part": "key”
> > > |
> > > {key={keyField3: key3}, value={valueField1: value1, valueField2:
> value2,
> > > valueField3: value3}} (KeyValue<AVRO, AVRO>)
> > > |
> > > | "type": "merge-key-value"
> > > |
> > > {key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
> > > valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
> > > |
> > > | "type": "unwrap-key-value"
> > > |
> > > {keyField3: key3, valueField1: value1, valueField2: value2,
> valueField3:
> > > value3} (AVRO)
> > > |
> > > | "type": "cast", "schema-type": "STRING"
> > > |
> > > {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
> > > "valueField3": "value3"} (STRING)
> > > ```
> > >
> > > `TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
> > > service file so it can be registered as a built-in function with name
> > > `transform`.
> > >
> > > ## Reject Alternatives
> > >
> > > None
> >
>
>
> --
> Best Regards,
> Neng
>
Re: [DISCUSS] PIP-173 : Create a built-in Function implementing the most common basic transformations
Posted by Neng Lu <fr...@gmail.com>.
I would suggest first having some concrete implementations in a separate
repo.
After verifying its functionality and performance, then we can move into
the main pulsar repo.
On Fri, Jun 3, 2022 at 5:09 AM Enrico Olivelli <eo...@gmail.com> wrote:
> Overall I agree with the proposal.
> I left some minor feedback on the issue
>
> Thank you
>
> Enrico
>
> Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet
> <bo...@gmail.com> ha scritto:
> >
> > Dear Pulsar community,
> >
> > I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to
> create a
> > built-in Function implementing the most common basic transformations.
> >
> > Let me know what you think.
> >
> > Best regards,
> >
> > Christophe
> >
> > ------
> >
> > ## Motivation
> >
> > Currently, when users want to modify the data in Pulsar, they need to
> write
> > a Function.
> > For a lot of use cases, it would be handy for them to be able to use a
> > ready-made built-in Function that implements the most common basic
> > transformations like the ones available in [Kafka Connect’s SMTs](
> >
> https://docs.confluent.io/platform/current/connect/transforms/overview.html
> > ).
> > This removes users the burden of writing the Function themselves, having
> to
> > understanding the perks of Pulsar Schemas, coding in a language that they
> > may not master (probably Java if they want to do advanced stuff), and
> they
> > benefit from battle-tested, maintained, performance-optimised code.
> >
> > ## Goal
> >
> > This PIP is about providing a `TransformFunction` that executes a
> sequence
> > of basic transformations on the data.
> > The `TransformFunction` shall be easy to configure, launchable as a
> > built-in NAR.
> > The `TransformFunction` shall be able to apply a sequence of common
> > transformations in-memory so we don’t need to execute the
> > `TransformFunction` multiple times and read/write to a topic each time.
> >
> > This PIP is not about appending such a Function to a Source or a Sink.
> > While this is the ultimate goal, so we can provide an experience similar
> to
> > Kafka SMTs and avoid a read/write to a topic, this work will be done in a
> > future PIP.
> > It is expected that the code written for this PIP will be reusable in
> this
> > future work.
> >
> > ## API Changes
> >
> > This PIP will introduce a new `transform` module in `pulsar-function`
> > multi-module project. The produced artifact will be a NAR of the
> > TransformFunction.
> >
> > ## Implementation
> >
> > When it processes a record, `TransformFunction` will :
> >
> > * Create a mutable structure `TransformContext` that contains
> >
> > ```java
> > @Data
> > public class TransformContext {
> > private Context context;
> > private Schema<?> keySchema;
> > private Object keyObject;
> > private boolean keyModified;
> > private Schema<?> valueSchema;
> > private Object valueObject;
> > private boolean valueModified;
> > private KeyValueEncodingType keyValueEncodingType;
> > private String key;
> > private Map<String, String> properties;
> > private String outputTopic;
> > ```
> >
> > If the record is a `KeyValue`, the key and value schemas and object are
> > unpacked. Otherwise the `keySchema` and `keyObject` are null.
> >
> > * Call in sequence the process method of a series of `TransformStep` on
> > this `TransformContext`
> >
> > ```java
> > public interface TransformStep {
> > void process(TransformContext transformContext) throws Exception;
> > }
> > ```
> >
> > Each `TransformStep` can then modify the `TransformContext` as needed.
> >
> > * Call the `send()` method of the `TransformContext` which will create
> the
> > message to send to the outputTopic, repacking the KeyValue if needed.
> >
> > The `TransformFunction` will read its configuration as Json from
> > `userConfig` in the format:
> >
> > ```json
> > {
> > "steps": [
> > {
> > "type": "drop-fields", "fields": "keyField1,keyField2", "part":
> "key"
> > },
> > {
> > "type": "merge-key-value"
> > },
> > {
> > "type": "unwrap-key-value"
> > },
> > {
> > "type": "cast", "schema-type": "STRING"
> > }
> > ]
> > }
> > ```
> >
> > Each step is defined by its `type` and uses its own arguments.
> >
> > This example config applied on a KeyValue<AVRO, AVRO> input record with
> > value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
> > value={valueField1: value1, valueField2: value2, valueField3: value3}}`
> > will give after each step:
> > ```
> > {key={keyField1: key1, keyField2: key2, keyField3: key3},
> > value={valueField1: value1, valueField2: value2, valueField3:
> > value3}}(KeyValue<AVRO, AVRO>)
> > |
> > | ”type": "drop-fields", "fields": "keyField1,keyField2”,
> > "part": "key”
> > |
> > {key={keyField3: key3}, value={valueField1: value1, valueField2: value2,
> > valueField3: value3}} (KeyValue<AVRO, AVRO>)
> > |
> > | "type": "merge-key-value"
> > |
> > {key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
> > valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
> > |
> > | "type": "unwrap-key-value"
> > |
> > {keyField3: key3, valueField1: value1, valueField2: value2, valueField3:
> > value3} (AVRO)
> > |
> > | "type": "cast", "schema-type": "STRING"
> > |
> > {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
> > "valueField3": "value3"} (STRING)
> > ```
> >
> > `TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
> > service file so it can be registered as a built-in function with name
> > `transform`.
> >
> > ## Reject Alternatives
> >
> > None
>
--
Best Regards,
Neng
Re: [DISCUSS] PIP-173 : Create a built-in Function implementing the most common basic transformations
Posted by Enrico Olivelli <eo...@gmail.com>.
Overall I agree with the proposal.
I left some minor feedback on the issue
Thank you
Enrico
Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet
<bo...@gmail.com> ha scritto:
>
> Dear Pulsar community,
>
> I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to create a
> built-in Function implementing the most common basic transformations.
>
> Let me know what you think.
>
> Best regards,
>
> Christophe
>
> ------
>
> ## Motivation
>
> Currently, when users want to modify the data in Pulsar, they need to write
> a Function.
> For a lot of use cases, it would be handy for them to be able to use a
> ready-made built-in Function that implements the most common basic
> transformations like the ones available in [Kafka Connect’s SMTs](
> https://docs.confluent.io/platform/current/connect/transforms/overview.html
> ).
> This removes users the burden of writing the Function themselves, having to
> understanding the perks of Pulsar Schemas, coding in a language that they
> may not master (probably Java if they want to do advanced stuff), and they
> benefit from battle-tested, maintained, performance-optimised code.
>
> ## Goal
>
> This PIP is about providing a `TransformFunction` that executes a sequence
> of basic transformations on the data.
> The `TransformFunction` shall be easy to configure, launchable as a
> built-in NAR.
> The `TransformFunction` shall be able to apply a sequence of common
> transformations in-memory so we don’t need to execute the
> `TransformFunction` multiple times and read/write to a topic each time.
>
> This PIP is not about appending such a Function to a Source or a Sink.
> While this is the ultimate goal, so we can provide an experience similar to
> Kafka SMTs and avoid a read/write to a topic, this work will be done in a
> future PIP.
> It is expected that the code written for this PIP will be reusable in this
> future work.
>
> ## API Changes
>
> This PIP will introduce a new `transform` module in `pulsar-function`
> multi-module project. The produced artifact will be a NAR of the
> TransformFunction.
>
> ## Implementation
>
> When it processes a record, `TransformFunction` will :
>
> * Create a mutable structure `TransformContext` that contains
>
> ```java
> @Data
> public class TransformContext {
> private Context context;
> private Schema<?> keySchema;
> private Object keyObject;
> private boolean keyModified;
> private Schema<?> valueSchema;
> private Object valueObject;
> private boolean valueModified;
> private KeyValueEncodingType keyValueEncodingType;
> private String key;
> private Map<String, String> properties;
> private String outputTopic;
> ```
>
> If the record is a `KeyValue`, the key and value schemas and object are
> unpacked. Otherwise the `keySchema` and `keyObject` are null.
>
> * Call in sequence the process method of a series of `TransformStep` on
> this `TransformContext`
>
> ```java
> public interface TransformStep {
> void process(TransformContext transformContext) throws Exception;
> }
> ```
>
> Each `TransformStep` can then modify the `TransformContext` as needed.
>
> * Call the `send()` method of the `TransformContext` which will create the
> message to send to the outputTopic, repacking the KeyValue if needed.
>
> The `TransformFunction` will read its configuration as Json from
> `userConfig` in the format:
>
> ```json
> {
> "steps": [
> {
> "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
> },
> {
> "type": "merge-key-value"
> },
> {
> "type": "unwrap-key-value"
> },
> {
> "type": "cast", "schema-type": "STRING"
> }
> ]
> }
> ```
>
> Each step is defined by its `type` and uses its own arguments.
>
> This example config applied on a KeyValue<AVRO, AVRO> input record with
> value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
> value={valueField1: value1, valueField2: value2, valueField3: value3}}`
> will give after each step:
> ```
> {key={keyField1: key1, keyField2: key2, keyField3: key3},
> value={valueField1: value1, valueField2: value2, valueField3:
> value3}}(KeyValue<AVRO, AVRO>)
> |
> | ”type": "drop-fields", "fields": "keyField1,keyField2”,
> "part": "key”
> |
> {key={keyField3: key3}, value={valueField1: value1, valueField2: value2,
> valueField3: value3}} (KeyValue<AVRO, AVRO>)
> |
> | "type": "merge-key-value"
> |
> {key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
> valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
> |
> | "type": "unwrap-key-value"
> |
> {keyField3: key3, valueField1: value1, valueField2: value2, valueField3:
> value3} (AVRO)
> |
> | "type": "cast", "schema-type": "STRING"
> |
> {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
> "valueField3": "value3"} (STRING)
> ```
>
> `TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
> service file so it can be registered as a built-in function with name
> `transform`.
>
> ## Reject Alternatives
>
> None