You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhipeng Zhang (Jira)" <ji...@apache.org> on 2022/07/26 05:33:00 UTC

[jira] [Commented] (FLINK-23020) NullPointerException when running collect twice from Python API

    [ https://issues.apache.org/jira/browse/FLINK-23020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571200#comment-17571200 ] 

Zhipeng Zhang commented on FLINK-23020:
---------------------------------------

Another similar one: https://github.com/apache/flink-ml/runs/7421222114?check_suite_focus=true

> NullPointerException when running collect twice from Python API
> ---------------------------------------------------------------
>
>                 Key: FLINK-23020
>                 URL: https://issues.apache.org/jira/browse/FLINK-23020
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.1
>            Reporter: Maciej Bryński
>            Priority: Major
>
> Hi, 
> I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in following scenario.
> 1. I'm creating datagen table.
> {code:java}
> from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.common import Configuration, Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.java_gateway import get_gateway
> conf = Configuration()
> env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> table_env.get_config().get_configuration().set_integer("parallelism.default", 1)
> table_env.execute_sql("DROP TABLE IF EXISTS datagen")
> table_env.execute_sql("""
> CREATE TABLE datagen (
>     id INT
> ) WITH (
>     'connector' = 'datagen'
> )
> """)
> {code}
> 2. Then I'm running collect
> {code:java}
> try:
>     result = table_env.sql_query("select * from datagen limit 1").execute()
>     for r in result.collect():
>         print(r)
> except KeyboardInterrupt:
>     result.get_job_client().cancel()
> {code}
> 3. I'm using "interrupt the kernel" button. This is handled by above try/except and will cancel the query.
> 4. I'm running collect from point 2 one more time. Result:
> {code:java}
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-5-98ef93c07bdb> in <module>
>       1 try:
> ----> 2     result = table_env.sql_query("select * from datagen limit 1").execute()
>       3     for r in result.collect():
>       4         print(r)
>       5 except KeyboardInterrupt:
> /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self)
>    1070         """
>    1071         self._t_env._before_execute()
> -> 1072         return TableResult(self._j_table.execute())
>    1073 
>    1074     def explain(self, *extra_details: ExplainDetail) -> str:
> /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, *args)
>    1283 
>    1284         answer = self.gateway_client.send_command(command)
> -> 1285         return_value = get_return_value(
>    1286             answer, self.gateway_client, self.target_id, self.name)
>    1287 
> /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, **kw)
>     144     def deco(*a, **kw):
>     145         try:
> --> 146             return f(*a, **kw)
>     147         except Py4JJavaError as e:
>     148             from pyflink.java_gateway import get_gateway
> /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
>     324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>     325             if answer[1] == REFERENCE_TYPE:
> --> 326                 raise Py4JJavaError(
>     327                     "An error occurred while calling {0}{1}{2}.\n".
>     328                     format(target_id, ".", name), value)
> Py4JJavaError: An error occurred while calling o69.execute.
> : java.lang.NullPointerException
> 	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
> 	at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:144)
> 	at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:108)
> 	at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.<init>(FlinkRelMetadataQuery.java:73)
> 	at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54)
> 	at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
> 	at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
> 	at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
> 	at org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965)
> 	at org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879)
> 	at org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858)
> 	at org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745)
> 	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545)
> 	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> 	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
> 	at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
> 	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> 	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> 	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> 	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> 	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
> 	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
> 	at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
> 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> 	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> 	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> 	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> 	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> 	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> 	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> 	at java.base/java.lang.Thread.run(Thread.java:829)
> {code}
>  
> PS. When I'm cancelling job from Web UI then I'm able to run collect twice. Problem exists only with cancelling the job from the code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)