You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by László Ciople <ci...@gmail.com> on 2020/12/08 09:20:34 UTC

Python UDF filter problem

Hello,
I am trying to use Flink v1.11.2 with Python and the Table API to read and
write back messages to kafka topics. I am trying to filter messages based
on the output of a udf which returns a boolean. It seems that Flink ignores
the WHERE clause in my queries and every input message is received in the
output topic.
The input table is declared in SQL:

--sql
CREATE TABLE teams_event (
    `payload` ROW(
        `createdDateTime` STRING,
        `body` ROW(
            `content` STRING
        ),
        `from` ROW(
            `user` ROW(
                `displayName` STRING
            )
        ),
        `channelIdentity` ROW(
            `channelId` STRING
        )
    )
) WITH (
    'connector' = 'kafka',
    'topic' = 'xdr.ms_teams2.events.messages',
    'properties.bootstrap.servers' =
'senso-kafka.solexdc01.bitdefender.biz:29030',
    'properties.group.id' = 'teams_profanity_filter',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset',
    'json.fail-on-missing-field' = 'false',
    'json.timestamp-format.standard' = 'ISO-8601'
)
"""

The output table is also declared in sql:

--sql
CREATE TABLE teams_profanity_event (
    `createdAt` STRING,
    `censoredMessage` STRING,
    `username` STRING,
    `channelId` STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'internal.alerts.teams.rude_employees2',
    'properties.bootstrap.servers' =
'senso-kafka.solexdc01.bitdefender.biz:29030',
    'format' = 'json'
)

I have declared two udfs and registered them in the table environment

@udf(input_types=[
    DataTypes.ROW([
        DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
        DataTypes.FIELD("body", DataTypes.ROW([
            DataTypes.FIELD("content", DataTypes.STRING())
        ])),
        DataTypes.FIELD("from", DataTypes.ROW([
            DataTypes.FIELD("user", DataTypes.ROW([
                DataTypes.FIELD("displayName", DataTypes.STRING())
            ]))
        ])),
        DataTypes.FIELD("channelIdentity", DataTypes.ROW([
            DataTypes.FIELD("channelId", DataTypes.STRING())
        ]))
    ])],
    result_type=DataTypes.BOOLEAN())
def contains_profanity(payload):
    message_content = payload[1][0]
    found_profanity = profanity.contains_profanity(message_content)
    logger.info(f'Message "{message_content}" contains profanity:
{found_profanity}')
    return found_profanity


@udf(input_types=[
    DataTypes.ROW([
        DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
        DataTypes.FIELD("body", DataTypes.ROW([
            DataTypes.FIELD("content", DataTypes.STRING())
        ])),
        DataTypes.FIELD("from", DataTypes.ROW([
            DataTypes.FIELD("user", DataTypes.ROW([
                DataTypes.FIELD("displayName", DataTypes.STRING())
            ]))
        ])),
        DataTypes.FIELD("channelIdentity", DataTypes.ROW([
            DataTypes.FIELD("channelId", DataTypes.STRING())
        ]))
    ])],
    result_type=DataTypes.STRING())
def censor_profanity(payload):
    message_content = payload[1][0]
    censored_message = profanity.censor(message_content)
    logger.info(f'Censored message: "{censored_message}"')
    return censored_message

The filtering of the messages and insertion into the sink is declared with
SQL:

--sql
INSERT INTO teams_profanity_event (
    SELECT  `payload`.`createdDateTime`,
            censor_profanity(`payload`),
            `payload`.`from`.`user`.`displayName`,
            `payload`.`channelIdentity`.`channelId`
    FROM teams_event
    WHERE contains_profanity(`payload`)
)

Am I doing something wrong? It seems that the contains_profanity udf is not
used in the pipeline:
[image: image.png]
Thank you in advance!

Re: Python UDF filter problem

Posted by László Ciople <ci...@gmail.com>.
Awesome, thanks!

On Tue, Dec 8, 2020 at 11:55 AM Xingbo Huang <hx...@gmail.com> wrote:

> Hi,
>
> This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3
> and release-1.12.0 have not been released yet (VOTE has been passed). I run
> your job in release-1.12, and the plan is correct.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-19675
>
> Best,
> Xingbo
>
> László Ciople <ci...@gmail.com> 于2020年12月8日周二 下午5:21写道:
>
>> Hello,
>> I am trying to use Flink v1.11.2 with Python and the Table API to read
>> and write back messages to kafka topics. I am trying to filter messages
>> based on the output of a udf which returns a boolean. It seems that Flink
>> ignores the WHERE clause in my queries and every input message is received
>> in the output topic.
>> The input table is declared in SQL:
>>
>> --sql
>> CREATE TABLE teams_event (
>>     `payload` ROW(
>>         `createdDateTime` STRING,
>>         `body` ROW(
>>             `content` STRING
>>         ),
>>         `from` ROW(
>>             `user` ROW(
>>                 `displayName` STRING
>>             )
>>         ),
>>         `channelIdentity` ROW(
>>             `channelId` STRING
>>         )
>>     )
>> ) WITH (
>>     'connector' = 'kafka',
>>     'topic' = 'xdr.ms_teams2.events.messages',
>>     'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
>>     'properties.group.id' = 'teams_profanity_filter',
>>     'format' = 'json',
>>     'scan.startup.mode' = 'earliest-offset',
>>     'json.fail-on-missing-field' = 'false',
>>     'json.timestamp-format.standard' = 'ISO-8601'
>> )
>> """
>>
>> The output table is also declared in sql:
>>
>> --sql
>> CREATE TABLE teams_profanity_event (
>>     `createdAt` STRING,
>>     `censoredMessage` STRING,
>>     `username` STRING,
>>     `channelId` STRING
>> ) WITH (
>>     'connector' = 'kafka',
>>     'topic' = 'internal.alerts.teams.rude_employees2',
>>     'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
>>     'format' = 'json'
>> )
>>
>> I have declared two udfs and registered them in the table environment
>>
>> @udf(input_types=[
>>     DataTypes.ROW([
>>         DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
>>         DataTypes.FIELD("body", DataTypes.ROW([
>>             DataTypes.FIELD("content", DataTypes.STRING())
>>         ])),
>>         DataTypes.FIELD("from", DataTypes.ROW([
>>             DataTypes.FIELD("user", DataTypes.ROW([
>>                 DataTypes.FIELD("displayName", DataTypes.STRING())
>>             ]))
>>         ])),
>>         DataTypes.FIELD("channelIdentity", DataTypes.ROW([
>>             DataTypes.FIELD("channelId", DataTypes.STRING())
>>         ]))
>>     ])],
>>     result_type=DataTypes.BOOLEAN())
>> def contains_profanity(payload):
>>     message_content = payload[1][0]
>>     found_profanity = profanity.contains_profanity(message_content)
>>     logger.info(f'Message "{message_content}" contains profanity: {found_profanity}')
>>     return found_profanity
>>
>>
>> @udf(input_types=[
>>     DataTypes.ROW([
>>         DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
>>         DataTypes.FIELD("body", DataTypes.ROW([
>>             DataTypes.FIELD("content", DataTypes.STRING())
>>         ])),
>>         DataTypes.FIELD("from", DataTypes.ROW([
>>             DataTypes.FIELD("user", DataTypes.ROW([
>>                 DataTypes.FIELD("displayName", DataTypes.STRING())
>>             ]))
>>         ])),
>>         DataTypes.FIELD("channelIdentity", DataTypes.ROW([
>>             DataTypes.FIELD("channelId", DataTypes.STRING())
>>         ]))
>>     ])],
>>     result_type=DataTypes.STRING())
>> def censor_profanity(payload):
>>     message_content = payload[1][0]
>>     censored_message = profanity.censor(message_content)
>>     logger.info(f'Censored message: "{censored_message}"')
>>     return censored_message
>>
>> The filtering of the messages and insertion into the sink is declared
>> with SQL:
>>
>> --sql
>> INSERT INTO teams_profanity_event (
>>     SELECT  `payload`.`createdDateTime`,
>>             censor_profanity(`payload`),
>>             `payload`.`from`.`user`.`displayName`,
>>             `payload`.`channelIdentity`.`channelId`
>>     FROM teams_event
>>     WHERE contains_profanity(`payload`)
>> )
>>
>> Am I doing something wrong? It seems that the contains_profanity udf is
>> not used in the pipeline:
>> [image: image.png]
>> Thank you in advance!
>>
>

Re: Python UDF filter problem

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3
and release-1.12.0 have not been released yet (VOTE has been passed). I run
your job in release-1.12, and the plan is correct.


[1] https://issues.apache.org/jira/browse/FLINK-19675

Best,
Xingbo

László Ciople <ci...@gmail.com> 于2020年12月8日周二 下午5:21写道:

> Hello,
> I am trying to use Flink v1.11.2 with Python and the Table API to read and
> write back messages to kafka topics. I am trying to filter messages based
> on the output of a udf which returns a boolean. It seems that Flink ignores
> the WHERE clause in my queries and every input message is received in the
> output topic.
> The input table is declared in SQL:
>
> --sql
> CREATE TABLE teams_event (
>     `payload` ROW(
>         `createdDateTime` STRING,
>         `body` ROW(
>             `content` STRING
>         ),
>         `from` ROW(
>             `user` ROW(
>                 `displayName` STRING
>             )
>         ),
>         `channelIdentity` ROW(
>             `channelId` STRING
>         )
>     )
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'xdr.ms_teams2.events.messages',
>     'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
>     'properties.group.id' = 'teams_profanity_filter',
>     'format' = 'json',
>     'scan.startup.mode' = 'earliest-offset',
>     'json.fail-on-missing-field' = 'false',
>     'json.timestamp-format.standard' = 'ISO-8601'
> )
> """
>
> The output table is also declared in sql:
>
> --sql
> CREATE TABLE teams_profanity_event (
>     `createdAt` STRING,
>     `censoredMessage` STRING,
>     `username` STRING,
>     `channelId` STRING
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'internal.alerts.teams.rude_employees2',
>     'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
>     'format' = 'json'
> )
>
> I have declared two udfs and registered them in the table environment
>
> @udf(input_types=[
>     DataTypes.ROW([
>         DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
>         DataTypes.FIELD("body", DataTypes.ROW([
>             DataTypes.FIELD("content", DataTypes.STRING())
>         ])),
>         DataTypes.FIELD("from", DataTypes.ROW([
>             DataTypes.FIELD("user", DataTypes.ROW([
>                 DataTypes.FIELD("displayName", DataTypes.STRING())
>             ]))
>         ])),
>         DataTypes.FIELD("channelIdentity", DataTypes.ROW([
>             DataTypes.FIELD("channelId", DataTypes.STRING())
>         ]))
>     ])],
>     result_type=DataTypes.BOOLEAN())
> def contains_profanity(payload):
>     message_content = payload[1][0]
>     found_profanity = profanity.contains_profanity(message_content)
>     logger.info(f'Message "{message_content}" contains profanity: {found_profanity}')
>     return found_profanity
>
>
> @udf(input_types=[
>     DataTypes.ROW([
>         DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
>         DataTypes.FIELD("body", DataTypes.ROW([
>             DataTypes.FIELD("content", DataTypes.STRING())
>         ])),
>         DataTypes.FIELD("from", DataTypes.ROW([
>             DataTypes.FIELD("user", DataTypes.ROW([
>                 DataTypes.FIELD("displayName", DataTypes.STRING())
>             ]))
>         ])),
>         DataTypes.FIELD("channelIdentity", DataTypes.ROW([
>             DataTypes.FIELD("channelId", DataTypes.STRING())
>         ]))
>     ])],
>     result_type=DataTypes.STRING())
> def censor_profanity(payload):
>     message_content = payload[1][0]
>     censored_message = profanity.censor(message_content)
>     logger.info(f'Censored message: "{censored_message}"')
>     return censored_message
>
> The filtering of the messages and insertion into the sink is declared with
> SQL:
>
> --sql
> INSERT INTO teams_profanity_event (
>     SELECT  `payload`.`createdDateTime`,
>             censor_profanity(`payload`),
>             `payload`.`from`.`user`.`displayName`,
>             `payload`.`channelIdentity`.`channelId`
>     FROM teams_event
>     WHERE contains_profanity(`payload`)
> )
>
> Am I doing something wrong? It seems that the contains_profanity udf is
> not used in the pipeline:
> [image: image.png]
> Thank you in advance!
>