You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2023/04/24 07:21:00 UTC
[jira] [Created] (FLINK-31905) Exception thrown when accessing nested field of the result of Python UDF with complex type
Dian Fu created FLINK-31905:
-------------------------------
Summary: Exception thrown when accessing nested field of the result of Python UDF with complex type
Key: FLINK-31905
URL: https://issues.apache.org/jira/browse/FLINK-31905
Project: Flink
Issue Type: Bug
Components: API / Python
Reporter: Dian Fu
For the following job:
{code}
import logging, sys
from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import Schema, DataTypes, TableDescriptor, StreamTableEnvironment
from pyflink.table.expressions import col, row
from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf
logging.basicConfig(stream=sys.stdout, level=logging.ERROR, format="%(message)s")
class EmitLastState(AggregateFunction):
"""
Aggregator that emits the latest state for the purpose of
enabling parallelism on CDC tables.
"""
def create_accumulator(self) -> ACC:
return Row(None, None)
def accumulate(self, accumulator: ACC, *args):
key, obj = args
if (accumulator[0] is None) or (key > accumulator[0]):
accumulator[0] = key
accumulator[1] = obj
def retract(self, accumulator: ACC, *args):
pass
def get_value(self, accumulator: ACC) -> T:
return accumulator[1]
some_complex_inner_type = DataTypes.ROW(
[
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.STRING())
]
)
some_complex_type = DataTypes.ROW(
[
DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
for k in ("f0", "f1", "f2")
]
+ [
DataTypes.FIELD("f3", DataTypes.DATE()),
DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
]
)
@udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
def complex_udf(s):
return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)
if __name__ == "__main__":
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
table_env.get_config().set('pipeline.classpaths', 'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')
# Create schema
_schema = {
"p_key": DataTypes.INT(False),
"modified_id": DataTypes.INT(False),
"content": DataTypes.STRING()
}
schema = Schema.new_builder().from_fields(
*zip(*[(k, v) for k, v in _schema.items()])
).\
primary_key("p_key").\
build()
# Create table descriptor
descriptor = TableDescriptor.for_connector("postgres-cdc").\
option("hostname", "host.docker.internal").\
option("port", "5432").\
option("database-name", "flink_issue").\
option("username", "root").\
option("password", "root").\
option("debezium.plugin.name", "pgoutput").\
option("schema-name", "flink_schema").\
option("table-name", "flink_table").\
option("slot.name", "flink_slot").\
schema(schema).\
build()
table_env.create_temporary_table("flink_table", descriptor)
# Create changelog stream
stream = table_env.from_path("flink_table")\
# Define UDAF
accumulator_type = DataTypes.ROW(
[
DataTypes.FIELD("f0", DataTypes.INT(False)),
DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in _schema.items()])),
]
)
result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in _schema.items()])
emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type, result_type=result_type)
# Emit last state based on modified_id to enable parallel processing
stream = stream.\
group_by(col("p_key")).\
select(
col("p_key"),
emit_last(col("modified_id"),row(*(col(k) for k in _schema.keys())).cast(result_type)).alias("tmp_obj")
)
# Select the elements of the objects
stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in _schema.keys()))
# We apply a UDF which parses the xml and returns a complex nested structure
stream = stream.select(col("p_key"), complex_udf(col("content")).alias("nested_obj"))
# We select an element from the nested structure in order to flatten it
# The next line is the line causing issues, commenting the next line will make the pipeline work
stream = stream.select(col("p_key"), col("nested_obj").get("f0"))
# Interestingly, the below part does work...
# stream = stream.select(col("nested_obj").get("f0"))
table_env.to_changelog_stream(stream).print()
# Execute
env.execute_async()
{code}
{code}
py4j.protocol.Py4JJavaError: An error occurred while calling o8.toChangelogStream.
: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
at java.base/java.util.Objects.checkIndex(Unknown Source)
at java.base/java.util.ArrayList.get(Unknown Source)
at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
at org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
at org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
at org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)