You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Spongebob (Jira)" <ji...@apache.org> on 2022/12/24 10:06:00 UTC

[jira] [Updated] (FLINK-30500) Got an null pointer exception when using table environment in sub-thread

     [ https://issues.apache.org/jira/browse/FLINK-30500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Spongebob updated FLINK-30500:
------------------------------
    Description: 
I can use the `addInsertSql` function nornally in norm situation, but when I create the table env in main thread and then run this function in sub-thread, I got an ambigious exception that was thrown by calcite.

 
{code:java}
public static void main(String[] args) throws InterruptedException {
    TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
    String ddl = "CREATE TABLE IF NOT EXISTS SOURCE(" +
            "A DECIMAL(19)," +
            "B DECIMAL(19)" +
            ") WITH (" +
            "'connector' = 'jdbc'...)";
    String ddl2 = "CREATE TABLE IF NOT EXISTS SINK(" +
            "A DECIMAL(19)," +
            "B DECIMAL(19)" +
            ") WITH (" +
            "'connector' = 'print')";
    tableEnvironment.executeSql(ddl);
    tableEnvironment.executeSql(ddl2);
    StatementSet statementSet = tableEnvironment.createStatementSet();
    CompletableFuture.runAsync(() -> {
        statementSet.addInsertSql("INSERT INTO SINK SELECT * FROM SOURCE");
        try {
            statementSet.execute().await();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }).exceptionally((e) -> {
        e.printStackTrace();
        return null;
    });

    Thread.sleep(10000);
} {code}
 
{code:java}
Caused by: java.lang.NullPointerException
    at java.util.Objects.requireNonNull(Objects.java:203)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:144)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:108)
    at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.<init>(FlinkRelMetadataQuery.java:78)
    at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:59)
    at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
    at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
    at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
    at org.apache.calcite.rel.logical.LogicalFilter.create(LogicalFilter.java:108)
    at org.apache.calcite.rel.core.RelFactories$FilterFactoryImpl.createFilter(RelFactories.java:344)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertWhere(SqlToRelConverter.java:1042)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2866)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:53) {code}

  was:
I can use the `addInsertSql` function nornally in norm situation, but when I create the table env in main thread and then run this function in sub-thread, I got an ambigious exception that was thrown by calcite.

 
{code:java}
Caused by: java.lang.NullPointerException
    at java.util.Objects.requireNonNull(Objects.java:203)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:144)
    at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:108)
    at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.<init>(FlinkRelMetadataQuery.java:78)
    at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:59)
    at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
    at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
    at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
    at org.apache.calcite.rel.logical.LogicalFilter.create(LogicalFilter.java:108)
    at org.apache.calcite.rel.core.RelFactories$FilterFactoryImpl.createFilter(RelFactories.java:344)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertWhere(SqlToRelConverter.java:1042)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2866)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:53) {code}


> Got an null pointer exception when using table environment in sub-thread
> ------------------------------------------------------------------------
>
>                 Key: FLINK-30500
>                 URL: https://issues.apache.org/jira/browse/FLINK-30500
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.3
>            Reporter: Spongebob
>            Priority: Major
>
> I can use the `addInsertSql` function nornally in norm situation, but when I create the table env in main thread and then run this function in sub-thread, I got an ambigious exception that was thrown by calcite.
>  
> {code:java}
> public static void main(String[] args) throws InterruptedException {
>     TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
>     String ddl = "CREATE TABLE IF NOT EXISTS SOURCE(" +
>             "A DECIMAL(19)," +
>             "B DECIMAL(19)" +
>             ") WITH (" +
>             "'connector' = 'jdbc'...)";
>     String ddl2 = "CREATE TABLE IF NOT EXISTS SINK(" +
>             "A DECIMAL(19)," +
>             "B DECIMAL(19)" +
>             ") WITH (" +
>             "'connector' = 'print')";
>     tableEnvironment.executeSql(ddl);
>     tableEnvironment.executeSql(ddl2);
>     StatementSet statementSet = tableEnvironment.createStatementSet();
>     CompletableFuture.runAsync(() -> {
>         statementSet.addInsertSql("INSERT INTO SINK SELECT * FROM SOURCE");
>         try {
>             statementSet.execute().await();
>         } catch (InterruptedException | ExecutionException e) {
>             throw new RuntimeException(e);
>         }
>     }).exceptionally((e) -> {
>         e.printStackTrace();
>         return null;
>     });
>     Thread.sleep(10000);
> } {code}
>  
> {code:java}
> Caused by: java.lang.NullPointerException
>     at java.util.Objects.requireNonNull(Objects.java:203)
>     at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:144)
>     at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:108)
>     at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.<init>(FlinkRelMetadataQuery.java:78)
>     at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:59)
>     at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>     at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>     at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>     at org.apache.calcite.rel.logical.LogicalFilter.create(LogicalFilter.java:108)
>     at org.apache.calcite.rel.core.RelFactories$FilterFactoryImpl.createFilter(RelFactories.java:344)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertWhere(SqlToRelConverter.java:1042)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2866)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>     at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>     at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
>     at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
>     at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
>     at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
>     at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
>     at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
>     at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)
>     at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>     at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:53) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)