You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2023/04/24 07:21:00 UTC

[jira] [Created] (FLINK-31905) Exception thrown when accessing nested field of the result of Python UDF with complex type

Dian Fu created FLINK-31905:
-------------------------------

             Summary: Exception thrown when accessing nested field of the result of Python UDF with complex type
                 Key: FLINK-31905
                 URL: https://issues.apache.org/jira/browse/FLINK-31905
             Project: Flink
          Issue Type: Bug
          Components: API / Python
            Reporter: Dian Fu


For the following job:
{code}
import logging, sys

from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import Schema, DataTypes, TableDescriptor, StreamTableEnvironment
from pyflink.table.expressions import col, row
from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf

logging.basicConfig(stream=sys.stdout, level=logging.ERROR, format="%(message)s")


class EmitLastState(AggregateFunction):
    """
    Aggregator that emits the latest state for the purpose of
    enabling parallelism on CDC tables.
    """

    def create_accumulator(self) -> ACC:
        return Row(None, None)

    def accumulate(self, accumulator: ACC, *args):
        key, obj = args
        if (accumulator[0] is None) or (key > accumulator[0]):
            accumulator[0] = key
            accumulator[1] = obj

    def retract(self, accumulator: ACC, *args):
        pass

    def get_value(self, accumulator: ACC) -> T:
        return accumulator[1]


some_complex_inner_type = DataTypes.ROW(
    [
        DataTypes.FIELD("f0", DataTypes.STRING()),
        DataTypes.FIELD("f1", DataTypes.STRING())
    ]
)

some_complex_type = DataTypes.ROW(
    [
        DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
        for k in ("f0", "f1", "f2")
    ]
    + [
        DataTypes.FIELD("f3", DataTypes.DATE()),
        DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
        DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
    ]
)

@udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
def complex_udf(s):
    return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)


if __name__ == "__main__":
    env = StreamExecutionEnvironment.get_execution_environment()
    table_env = StreamTableEnvironment.create(env)
    table_env.get_config().set('pipeline.classpaths', 'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')

    # Create schema
    _schema = {
        "p_key": DataTypes.INT(False),
        "modified_id": DataTypes.INT(False),
        "content": DataTypes.STRING()
    }
    schema = Schema.new_builder().from_fields(
        *zip(*[(k, v) for k, v in _schema.items()])
    ).\
        primary_key("p_key").\
        build()

    # Create table descriptor
    descriptor = TableDescriptor.for_connector("postgres-cdc").\
        option("hostname", "host.docker.internal").\
        option("port", "5432").\
        option("database-name", "flink_issue").\
        option("username", "root").\
        option("password", "root").\
        option("debezium.plugin.name", "pgoutput").\
        option("schema-name", "flink_schema").\
        option("table-name", "flink_table").\
        option("slot.name", "flink_slot").\
        schema(schema).\
        build()

    table_env.create_temporary_table("flink_table", descriptor)

    # Create changelog stream
    stream = table_env.from_path("flink_table")\

    # Define UDAF
    accumulator_type = DataTypes.ROW(
        [
            DataTypes.FIELD("f0", DataTypes.INT(False)),
            DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in _schema.items()])),
        ]
    )
    result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in _schema.items()])
    emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type, result_type=result_type)

    # Emit last state based on modified_id to enable parallel processing
    stream = stream.\
        group_by(col("p_key")).\
        select(
        col("p_key"),
        emit_last(col("modified_id"),row(*(col(k) for k in _schema.keys())).cast(result_type)).alias("tmp_obj")
    )

    # Select the elements of the objects
    stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in _schema.keys()))

    # We apply a UDF which parses the xml and returns a complex nested structure
    stream = stream.select(col("p_key"), complex_udf(col("content")).alias("nested_obj"))

    # We select an element from the nested structure in order to flatten it
    # The next line is the line causing issues, commenting the next line will make the pipeline work
    stream = stream.select(col("p_key"), col("nested_obj").get("f0"))

    # Interestingly, the below part does work...
    # stream = stream.select(col("nested_obj").get("f0"))

    table_env.to_changelog_stream(stream).print()

    # Execute
    env.execute_async()
{code}

{code}
py4j.protocol.Py4JJavaError: An error occurred while calling o8.toChangelogStream.
: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
        at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
        at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
        at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
        at java.base/java.util.Objects.checkIndex(Unknown Source)
        at java.base/java.util.ArrayList.get(Unknown Source)
        at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
        at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
        at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
        at org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
        at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
        at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
        at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
        at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
        at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
        at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
        at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
        at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
        at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
        at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
        at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
        at org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
        at org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
        at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
        at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
        at org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
        at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.immutable.Range.foreach(Range.scala:155)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
        at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
        at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Unknown Source)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)