You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/07/15 11:31:14 UTC

pyFlink UDTF function registration

Hi,
I am trying to use a UserDefined Table Function to split up some data as
follows:

from pyflink.table.udf import udtf

@udtf(input_types=DataTypes.STRING(), result_types=
[DataTypes.STRING(), DataTypes.DOUBLE()])
def split_feature_values(data_string):
    json_data = loads(data_string)
    for f_name, f_value in json_data.items():
        yield (f_name, f_value)

# configure the off-heap memory of current taskmanager to enable the
python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
ddl_source = f"""
    CREATE TABLE {INPUT_TABLE} (
        `monitorId` STRING,
        `deviceId` STRING,
        `state` INT,
        `data` STRING,
        `time_st` TIMESTAMP(3),
        WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = '{INPUT_TOPIC}',
        'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
        'format' = 'json'
    )
"""

ddl_temporary_table = f"""
    CREATE TABLE {TEMPORARY_TABLE} (
        `monitorId` STRING,
        `featureName` STRING,
        `featureData` DOUBLE,
        `time_st` TIMESTAMP(3),
        WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
    )
"""

ddl_populate_temporary_table = f"""
    INSERT INTO {TEMPORARY_TABLE}
    SELECT monitorId, split(data), time_st
    FROM {INPUT_TABLE}
"""

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_temporary_table)
t_env.execute_sql(ddl_populate_temporary_table)


However, I get the following error :
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed.
From line 3, column 23 to line 3, column 33:* No match found for function
signature split(<CHARACTER>)*

I believe I am using the correct call to register the UDTF as per [1]. Am I
missing something?

Thanks,
Manas

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions

Re: pyFlink UDTF function registration

Posted by Manas Kale <ma...@gmail.com>.
Hi Xingbo,
Thank you for the elaboration. Note that all of this is for a streaming job.
I used this code to create a SQL VIEW :

f"""
    CREATE VIEW TMP_TABLE AS
    SELECT monitorId, featureName, featureData, time_st FROM (
    SELECT monitorId, featureName, featureData, time_st
    FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName,
featureData)) t
"""

and then tried executing a Tumbling window on it:

f"""
    INSERT INTO {OUTPUT_TABLE}
    SELECT monitorId, MAX(featureData), MIN(featureData),
AVG(featureData), TUMBLE_START(time_st, INTERVAL '2' SECOND)
    FROM TMP_TABLE
    GROUP BY TUMBLE(time_st, INTERVAL '2' SECOND), monitorId, featureName
""")

But I am getting this error:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
'data' not found in any table

I don't understand how it is expecting column "data" as I explicitly did
not select that in the CREATE VIEW statement.
Also, is it valid to use an SQL VIEW for a streaming job such as this?

Regards,
Manas


On Wed, Jul 15, 2020 at 5:42 PM Xingbo Huang <hx...@gmail.com> wrote:

> Hi Manas,
> You need to join with the python udtf function. You can try the following
> sql:
>
> ddl_populate_temporary_table = f"""
>     INSERT INTO {TEMPORARY_TABLE}
>     SELECT * FROM (
>     SELECT monitorId, featureName, featureData, time_st
>     FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName,
> featureData)) t
> """
>
> Best,
> Xingbo
>
> Manas Kale <ma...@gmail.com> 于2020年7月15日周三 下午7:31写道:
>
>> Hi,
>> I am trying to use a UserDefined Table Function to split up some data as
>> follows:
>>
>> from pyflink.table.udf import udtf
>>
>> @udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), DataTypes.DOUBLE()])
>> def split_feature_values(data_string):
>>     json_data = loads(data_string)
>>     for f_name, f_value in json_data.items():
>>         yield (f_name, f_value)
>>
>> # configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
>>
>> # Register UDTF
>> t_env.register_function("split", split_feature_values)
>> ddl_source = f"""
>>     CREATE TABLE {INPUT_TABLE} (
>>         `monitorId` STRING,
>>         `deviceId` STRING,
>>         `state` INT,
>>         `data` STRING,
>>         `time_st` TIMESTAMP(3),
>>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
>>     ) WITH (
>>         'connector' = 'kafka',
>>         'topic' = '{INPUT_TOPIC}',
>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>         'format' = 'json'
>>     )
>> """
>>
>> ddl_temporary_table = f"""
>>     CREATE TABLE {TEMPORARY_TABLE} (
>>         `monitorId` STRING,
>>         `featureName` STRING,
>>         `featureData` DOUBLE,
>>         `time_st` TIMESTAMP(3),
>>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
>>     )
>> """
>>
>> ddl_populate_temporary_table = f"""
>>     INSERT INTO {TEMPORARY_TABLE}
>>     SELECT monitorId, split(data), time_st
>>     FROM {INPUT_TABLE}
>> """
>>
>> t_env.execute_sql(ddl_source)
>> t_env.execute_sql(ddl_temporary_table)
>> t_env.execute_sql(ddl_populate_temporary_table)
>>
>>
>> However, I get the following error :
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o5.executeSql.
>> : org.apache.flink.table.api.ValidationException: SQL validation failed.
>> From line 3, column 23 to line 3, column 33:* No match found for
>> function signature split(<CHARACTER>)*
>>
>> I believe I am using the correct call to register the UDTF as per [1]. Am
>> I missing something?
>>
>> Thanks,
>> Manas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions
>>
>

Re: pyFlink UDTF function registration

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Manas,
You need to join with the python udtf function. You can try the following
sql:

ddl_populate_temporary_table = f"""
    INSERT INTO {TEMPORARY_TABLE}
    SELECT * FROM (
    SELECT monitorId, featureName, featureData, time_st
    FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName,
featureData)) t
"""

Best,
Xingbo

Manas Kale <ma...@gmail.com> 于2020年7月15日周三 下午7:31写道:

> Hi,
> I am trying to use a UserDefined Table Function to split up some data as
> follows:
>
> from pyflink.table.udf import udtf
>
> @udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), DataTypes.DOUBLE()])
> def split_feature_values(data_string):
>     json_data = loads(data_string)
>     for f_name, f_value in json_data.items():
>         yield (f_name, f_value)
>
> # configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
>
> # Register UDTF
> t_env.register_function("split", split_feature_values)
> ddl_source = f"""
>     CREATE TABLE {INPUT_TABLE} (
>         `monitorId` STRING,
>         `deviceId` STRING,
>         `state` INT,
>         `data` STRING,
>         `time_st` TIMESTAMP(3),
>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
>     ) WITH (
>         'connector' = 'kafka',
>         'topic' = '{INPUT_TOPIC}',
>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>         'format' = 'json'
>     )
> """
>
> ddl_temporary_table = f"""
>     CREATE TABLE {TEMPORARY_TABLE} (
>         `monitorId` STRING,
>         `featureName` STRING,
>         `featureData` DOUBLE,
>         `time_st` TIMESTAMP(3),
>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
>     )
> """
>
> ddl_populate_temporary_table = f"""
>     INSERT INTO {TEMPORARY_TABLE}
>     SELECT monitorId, split(data), time_st
>     FROM {INPUT_TABLE}
> """
>
> t_env.execute_sql(ddl_source)
> t_env.execute_sql(ddl_temporary_table)
> t_env.execute_sql(ddl_populate_temporary_table)
>
>
> However, I get the following error :
> py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
> : org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 3, column 23 to line 3, column 33:* No match found for function
> signature split(<CHARACTER>)*
>
> I believe I am using the correct call to register the UDTF as per [1]. Am
> I missing something?
>
> Thanks,
> Manas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions
>