You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/05/13 06:44:00 UTC
[jira] [Created] (FLINK-27598) Improve the exception message when mixing use Python UDF and Pandas UDF
Dian Fu created FLINK-27598:
-------------------------------
Summary: Improve the exception message when mixing use Python UDF and Pandas UDF
Key: FLINK-27598
URL: https://issues.apache.org/jira/browse/FLINK-27598
Project: Flink
Issue Type: Bug
Components: API / Python
Reporter: Dian Fu
For the following job:
{code}
import argparse
from decimal import Decimal
from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import AggregateFunction, udaf
class DeduplicatedSum(AggregateFunction):
def create_accumulator(self):
return \{int(0), float(0)}
def get_value(self, accumulator) -> float:
sum(accumulator.values())
def accumulate(self, accumulator, k: int, v: float):
if k not in accumulator:
accumulator[k] = v
def retract(self, accumulator, k: int, v: float):
if k in accumulator:
del accumulator[k]
deduplicated_sum = udaf(f=DeduplicatedSum(),
func_type="pandas",
result_type=DataTypes.DOUBLE(),
input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])
class FirstValue(AggregateFunction):
def create_accumulator(self):
return [int(-1), float(0)]
def get_value(self, accumulator) -> float:
return accumulator[1]
def accumulate(self, accumulator, k: int, v: float):
ck = accumulator[0]
if ck > k:
accumulator[0] = k
accumulator[1] = v
first_value = udaf(f=FirstValue(),
result_type=DataTypes.DOUBLE(),
func_type="pandas",
input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])
class LastValue(AggregateFunction):
def create_accumulator(self):
return [int(-1), float(0)]
def get_value(self, accumulator: Row) -> float:
return accumulator[1]
def accumulate(self, accumulator: Row, k: int, v: float):
ck = accumulator[0]
if ck < k:
accumulator[0] = k
accumulator[1] = v
last_value = udaf(f=LastValue(),
func_type="pandas",
result_type=DataTypes.DOUBLE(),
input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])
def create_source_table_trades(table_env):
source = f"""
CREATE TABLE src_trade (
`id` VARCHAR
,`timestamp` BIGINT
,`side` VARCHAR
,`price` DOUBLE
,`size` DOUBLE
,`uniqueId` BIGINT
,ts_micro AS `timestamp`
,ts_milli AS `timestamp` / 1000
,ts AS TO_TIMESTAMP_LTZ(`timestamp` / 1000, 3)
,WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen')
"""
table_env.execute_sql(source)
def create_sink_table(table_env):
sink = f"""
CREATE TABLE dst_kline (
wst TIMESTAMP_LTZ(3)
,wet TIMESTAMP_LTZ(3)
,otm BIGINT
,ot TIMESTAMP_LTZ(3)
,ctm BIGINT
,ct TIMESTAMP_LTZ(3)
,ptm BIGINT
,pt TIMESTAMP_LTZ(3)
,`open` DOUBLE
,`close` DOUBLE
,`high` DOUBLE
,`low` DOUBLE
,`vol` DOUBLE -- total trade volume
,`to` DOUBLE -- total turnover value
,`rev` INT -- revision, something we might use for versioning
,`gap` INT -- if this value is reliable
,PRIMARY KEY(wst) NOT ENFORCED
) WITH (
'connector' = 'print'
)
"""
table_env.execute_sql(sink)
def kafka_src_topic(value):
if not len(value.split('-')) == 5:
raise argparse.ArgumentTypeError("{} is not a valid kafka topic".format(value))
return value
def interval(value):
i = []
prev_num = []
for character in value:
if character.isalpha():
if prev_num:
num = Decimal(''.join(prev_num))
if character == 'd':
i.append(f"'\{num}' DAYS")
elif character == 'h':
i.append(f"'\{num}' HOURS")
elif character == 'm':
i.append(f"'\{num}' MINUTES")
elif character == 's':
i.append(f"'\{num}' SECONDS")
prev_num = []
elif character.isnumeric() or character == '.':
prev_num.append(character)
return " ".join(i)
def fetch_arguments_flink_kline():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--bootstrap-servers', type=str, required=True)
parser.add_argument('--src-topic', type=kafka_src_topic)
parser.add_argument('--consume-mode', type=str, default='group-offsets',
choices=['group-offsets', 'latest-offset'],
help='scan.startup.mode for kafka')
parser.add_argument('--interval', type=str, default='20s',
help='output interval e.g. 5d4h3m1s, default to 20s')
parser.add_argument('--force-test', action='store_true')
parser.add_argument('--consumer-group-hint', type=str, default='1')
args = parser.parse_args()
if args.force_test and args.consumer_group_hint == '1':
parser.error("With --force-test, should not use default '1' for --consumer-group-hint")
return args
def main():
# args = fetch_arguments_flink_kline()
# parts = args.src_topic.split('-')
# _, e, p, s, _ = parts
# dst_topic = f'\{e}-\{p}-\{s}-Kline\{args.interval}'
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
# table_env.get_config().get_configuration().set_boolean("table.exec.emit.early-fire.enabled", True)
# table_env.get_config().get_configuration().set_string("table.exec.emit.early-fire.delay", "0 s")
table_env.get_config().get_configuration().set_string("table.exec.emit.allow-lateness", "1 h")
# table_env.get_config().get_configuration().set_boolean("table.exec.emit.late-fire.enabled", True)
# table_env.get_config().get_configuration().set_string("table.exec.emit.late-fire.delay", "0 s")
table_env.create_temporary_function("deduplicated_sum", deduplicated_sum)
table_env.create_temporary_function("first_value", first_value)
table_env.create_temporary_function("last_value", last_value)
create_source_table_trades(table_env)
create_sink_table(table_env)
stmt = f"""
INSERT INTO dst_kline
SELECT TUMBLE_START(ts, INTERVAL '1' DAY)
,TUMBLE_END(ts, INTERVAL '1' DAY)
,MIN(ts_milli)
,MIN(ts) AS st
,MAX(ts_milli)
,MAX(ts) AS et
,EXTRACT(MILLISECOND FROM CURRENT_TIMESTAMP) + UNIX_TIMESTAMP() * 1000
,CURRENT_TIMESTAMP
,first_value(ts_micro, price)
,last_value(ts_micro, price)
,MAX(price)
,MIN(price)
,deduplicated_sum(uniqueId, `size`)
,deduplicated_sum(uniqueId, price * `size`)
,1
,CAST((MAX(ts_milli) - MIN(ts_milli)) / 1000 AS INT)
FROM src_trade
GROUP BY TUMBLE(ts, INTERVAL '1' DAY)
"""
table_env.execute_sql(stmt)
if __name__ == '__main__':
main()
{code}
It throws the following exception:
{code}
Traceback (most recent call last):
File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", line 207, in <module>
main()
File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/kl(3).py", line 203, in main
table_env.execute_sql(stmt)
File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 876, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
at org.apache.flink.table.functions.python.PythonFunctionInfo.<init>(PythonFunctionInfo.java:45)
at org.apache.flink.table.functions.python.PythonAggregateFunctionInfo.<init>(PythonAggregateFunctionInfo.java:36)
at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.extractPythonAggregateFunctionInfosFromAggregateCall(CommonPythonUtil.java:236)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.createPandasPythonStreamWindowGroupOneInputTransformation(StreamExecPythonGroupWindowAggregate.java:365)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.translateToPlanInternal(StreamExecPythonGroupWindowAggregate.java:264)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:872)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
{code}
The exception message is confusing and should be improved.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)