You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/05/13 06:44:00 UTC

[jira] [Created] (FLINK-27598) Improve the exception message when mixing use Python UDF and Pandas UDF

Dian Fu created FLINK-27598:
-------------------------------

             Summary: Improve the exception message when mixing use Python UDF and Pandas UDF
                 Key: FLINK-27598
                 URL: https://issues.apache.org/jira/browse/FLINK-27598
             Project: Flink
          Issue Type: Bug
          Components: API / Python
            Reporter: Dian Fu


For the following job:
{code}

import argparse
from decimal import Decimal

from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import AggregateFunction, udaf


class DeduplicatedSum(AggregateFunction):
def create_accumulator(self):
return \{int(0), float(0)}

def get_value(self, accumulator) -> float:
sum(accumulator.values())

def accumulate(self, accumulator, k: int, v: float):
if k not in accumulator:
accumulator[k] = v

def retract(self, accumulator, k: int, v: float):
if k in accumulator:
del accumulator[k]


deduplicated_sum = udaf(f=DeduplicatedSum(),
func_type="pandas",
result_type=DataTypes.DOUBLE(),
input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])


class FirstValue(AggregateFunction):
def create_accumulator(self):
return [int(-1), float(0)]

def get_value(self, accumulator) -> float:
return accumulator[1]

def accumulate(self, accumulator, k: int, v: float):
ck = accumulator[0]
if ck > k:
accumulator[0] = k
accumulator[1] = v


first_value = udaf(f=FirstValue(),
result_type=DataTypes.DOUBLE(),
func_type="pandas",
input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])


class LastValue(AggregateFunction):
def create_accumulator(self):
return [int(-1), float(0)]

def get_value(self, accumulator: Row) -> float:
return accumulator[1]

def accumulate(self, accumulator: Row, k: int, v: float):
ck = accumulator[0]
if ck < k:
accumulator[0] = k
accumulator[1] = v


last_value = udaf(f=LastValue(),
func_type="pandas",
result_type=DataTypes.DOUBLE(),
input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])


def create_source_table_trades(table_env):
source = f"""
CREATE TABLE src_trade (
`id` VARCHAR
,`timestamp` BIGINT
,`side` VARCHAR
,`price` DOUBLE
,`size` DOUBLE
,`uniqueId` BIGINT
,ts_micro AS `timestamp`
,ts_milli AS `timestamp` / 1000
,ts AS TO_TIMESTAMP_LTZ(`timestamp` / 1000, 3)
,WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen')
"""
table_env.execute_sql(source)


def create_sink_table(table_env):
sink = f"""
CREATE TABLE dst_kline (
wst TIMESTAMP_LTZ(3)
,wet TIMESTAMP_LTZ(3)
,otm BIGINT
,ot TIMESTAMP_LTZ(3)
,ctm BIGINT
,ct TIMESTAMP_LTZ(3)
,ptm BIGINT
,pt TIMESTAMP_LTZ(3)
,`open` DOUBLE
,`close` DOUBLE
,`high` DOUBLE
,`low` DOUBLE
,`vol` DOUBLE -- total trade volume
,`to` DOUBLE -- total turnover value
,`rev` INT -- revision, something we might use for versioning
,`gap` INT -- if this value is reliable
,PRIMARY KEY(wst) NOT ENFORCED
) WITH (
'connector' = 'print'
)
"""
table_env.execute_sql(sink)


def kafka_src_topic(value):
if not len(value.split('-')) == 5:
raise argparse.ArgumentTypeError("{} is not a valid kafka topic".format(value))
return value


def interval(value):
i = []
prev_num = []
for character in value:
if character.isalpha():
if prev_num:
num = Decimal(''.join(prev_num))
if character == 'd':
i.append(f"'\{num}' DAYS")
elif character == 'h':
i.append(f"'\{num}' HOURS")
elif character == 'm':
i.append(f"'\{num}' MINUTES")
elif character == 's':
i.append(f"'\{num}' SECONDS")
prev_num = []
elif character.isnumeric() or character == '.':
prev_num.append(character)
return " ".join(i)


def fetch_arguments_flink_kline():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--bootstrap-servers', type=str, required=True)
parser.add_argument('--src-topic', type=kafka_src_topic)
parser.add_argument('--consume-mode', type=str, default='group-offsets',
choices=['group-offsets', 'latest-offset'],
help='scan.startup.mode for kafka')
parser.add_argument('--interval', type=str, default='20s',
help='output interval e.g. 5d4h3m1s, default to 20s')
parser.add_argument('--force-test', action='store_true')
parser.add_argument('--consumer-group-hint', type=str, default='1')
args = parser.parse_args()
if args.force_test and args.consumer_group_hint == '1':
parser.error("With --force-test, should not use default '1' for --consumer-group-hint")
return args


def main():
# args = fetch_arguments_flink_kline()
# parts = args.src_topic.split('-')
# _, e, p, s, _ = parts
# dst_topic = f'\{e}-\{p}-\{s}-Kline\{args.interval}'

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
# table_env.get_config().get_configuration().set_boolean("table.exec.emit.early-fire.enabled", True)
# table_env.get_config().get_configuration().set_string("table.exec.emit.early-fire.delay", "0 s")
table_env.get_config().get_configuration().set_string("table.exec.emit.allow-lateness", "1 h")
# table_env.get_config().get_configuration().set_boolean("table.exec.emit.late-fire.enabled", True)
# table_env.get_config().get_configuration().set_string("table.exec.emit.late-fire.delay", "0 s")
table_env.create_temporary_function("deduplicated_sum", deduplicated_sum)
table_env.create_temporary_function("first_value", first_value)
table_env.create_temporary_function("last_value", last_value)

create_source_table_trades(table_env)
create_sink_table(table_env)
stmt = f"""
INSERT INTO dst_kline
SELECT TUMBLE_START(ts, INTERVAL '1' DAY)
,TUMBLE_END(ts, INTERVAL '1' DAY)
,MIN(ts_milli)
,MIN(ts) AS st
,MAX(ts_milli)
,MAX(ts) AS et
,EXTRACT(MILLISECOND FROM CURRENT_TIMESTAMP) + UNIX_TIMESTAMP() * 1000
,CURRENT_TIMESTAMP
,first_value(ts_micro, price)
,last_value(ts_micro, price)
,MAX(price)
,MIN(price)
,deduplicated_sum(uniqueId, `size`)
,deduplicated_sum(uniqueId, price * `size`)
,1
,CAST((MAX(ts_milli) - MIN(ts_milli)) / 1000 AS INT)
FROM src_trade
GROUP BY TUMBLE(ts, INTERVAL '1' DAY)
"""
table_env.execute_sql(stmt)


if __name__ == '__main__':
    main()
{code}

It throws the following exception:
{code}
Traceback (most recent call last):
  File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", line 207, in <module>
    main()
  File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", line 203, in main
    table_env.execute_sql(stmt)
  File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 876, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: java.lang.NullPointerException
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
    at org.apache.flink.table.functions.python.PythonFunctionInfo.<init>(PythonFunctionInfo.java:45)
    at org.apache.flink.table.functions.python.PythonAggregateFunctionInfo.<init>(PythonAggregateFunctionInfo.java:36)
    at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.extractPythonAggregateFunctionInfosFromAggregateCall(CommonPythonUtil.java:236)
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.createPandasPythonStreamWindowGroupOneInputTransformation(StreamExecPythonGroupWindowAggregate.java:365)
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.translateToPlanInternal(StreamExecPythonGroupWindowAggregate.java:264)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
    at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71)
    at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:872)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
{code}

The exception message is confusing and should be improved.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)