You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2020/04/12 09:13:00 UTC

[jira] [Updated] (FLINK-17093) Python UDF doesn't work when the input column is from composite field

     [ https://issues.apache.org/jira/browse/FLINK-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dian Fu updated FLINK-17093:
----------------------------
    Summary: Python UDF doesn't work when the input column is from composite field  (was: Python UDF doesn't work when the input column is of composite type)

> Python UDF doesn't work when the input column is from composite field
> ---------------------------------------------------------------------
>
>                 Key: FLINK-17093
>                 URL: https://issues.apache.org/jira/browse/FLINK-17093
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.10.0
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Blocker
>             Fix For: 1.10.1, 1.11.0
>
>
> For the following job:
> {code}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, EnvironmentSettings, CsvTableSink
> from pyflink.table.descriptors import Schema, Kafka, Json
> from pyflink.table import DataTypes
> from pyflink.table.udf import ScalarFunction, udf
> import os
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
>  DataTypes.STRING()],
>  result_type=DataTypes.STRING())
> def get_host_ip(source, qr, sip, dip):
>     if source == "NGAF" and qr == '1':
>         return dip
>     return sip
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
>  DataTypes.STRING()],
>  result_type=DataTypes.STRING())
> def get_dns_server_ip(source, qr, sip, dip):
>     if source == "NGAF" and qr == '1':
>         return sip
>     return dip
> def test_case():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>     t_env = StreamTableEnvironment.create(env)
>      from pyflink.table import Row
>    table = t_env.from_elements(
>       [("DNS", Row(source="source", devid="devid", sip="sip", dip="dip", qr="qr", queries="queries", answers="answers", qtypes="qtypes", atypes="atypes", rcode="rcode", ts="ts",))],
>     DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()),
>  DataTypes.FIELD("data",
>  DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()),
>  DataTypes.FIELD("devid", DataTypes.STRING()),
>  DataTypes.FIELD('sip', DataTypes.STRING()),
>  DataTypes.FIELD('dip', DataTypes.STRING()),
>  DataTypes.FIELD("qr", DataTypes.STRING()),
>  DataTypes.FIELD("queries", DataTypes.STRING()),
>  DataTypes.FIELD("answers", DataTypes.STRING()),
>  DataTypes.FIELD("qtypes", DataTypes.STRING()),
>  DataTypes.FIELD("atypes", DataTypes.STRING()),
>  DataTypes.FIELD("rcode", DataTypes.STRING()),
>  DataTypes.FIELD("ts", DataTypes.STRING())]))
>  ]
>  ))
>  result_file = "/tmp/test.csv"
>  if os.path.exists(result_file):
>  os.remove(result_file)
>  t_env.register_table_sink("Results",
>  CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n'],
>  [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
>  DataTypes.STRING(),
>  DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
>  DataTypes.STRING(), DataTypes.STRING(),
>  DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()],
>  "/tmp/test.csv"))
>  t_env.register_function("get_host_ip", get_host_ip)
>  t_env.register_function("get_dns_server_ip", get_dns_server_ip)
>  t_env.register_table("source", table)
>  standard_table = t_env.sql_query("select data.*, stype as dns_type from source")\
>  .where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')")
>  t_env.register_table("standard_table", standard_table)
>  final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip) as host_ip,"
>  "get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM standard_table")
>  final_table.insert_into("Results")
>  t_env.execute("test")
> if __name__ == '__main__':
>  test_case()
> {code}
> The plan is as following which is not correct:
> {code}
>  org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL', _UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')), select: (data, type) -> select: (type, get_host_ip(type.source, type.qr, type.sip, type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip, type.dip) AS f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip AS sip, f0.dip AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS answers, f0.qtypes AS qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts AS ts, type AS dns_type, f0 AS host_ip, f1 AS dns_server_ip) -> to: Row -> Sink: KafkaTableSink(source, devid, sip, dip, qr, queries, answers, qtypes, atypes, rcode, ts, dns_type, host_ip, dns_server_ip) (1/4) (8d064ab137866a2a9040392a87bcc59d) switched from RUNNING to FAILED.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)