You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Alexander Filipchik (Jira)" <ji...@apache.org> on 2020/07/15 04:22:00 UTC

[jira] [Created] (FLINK-18603) SQL fails with java.lang.IllegalStateException: No operators defined in streaming topology

Alexander Filipchik created FLINK-18603:
-------------------------------------------

             Summary: SQL fails with java.lang.IllegalStateException: No operators defined in streaming topology
                 Key: FLINK-18603
                 URL: https://issues.apache.org/jira/browse/FLINK-18603
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.11.0
            Reporter: Alexander Filipchik


Hi, was playing with 1.11 and found that code that worked in 1.10.1 fails in 1.11.0 with : 
{code:java}
Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
	at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
	at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
	at com.css.flink.avro.confluent.table.SqlTest.main(SqlTest.java:53)

{code}
code example:
{code:java}
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
    EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

String createTable =
    String.format(
        "create table EnrichedOrders ("
            + "name VARCHAR,"
            + "proctime AS PROCTIME()"
            + ") with ("
            + "  'connector.type' = 'kafka',"
            + "  'connector.version' = 'universal',"
            + "  'connector.property-version' = '1',"
            + "  'connector.topic' = '%s',"
            + "  'connector.properties.bootstrap.servers' = '%s',"
            + "  'connector.properties.group.id' = '%s',"
            + "  'connector.startup-mode' = 'earliest-offset',"
            + "  'update-mode' = 'append',"
            + "  'format.type' = 'confluent-avro',"
            + "  'format.schema-registry' = '%s'"
            + ")",
        "avro",
        "broker",
        "testSqlLocal",
        "registry");

tEnv.executeSql(createTable);

tEnv.toAppendStream(
        tEnv.sqlQuery(
            "select name, sum(*) "
                + "from EnrichedOrders "
                + "GROUP BY TUMBLE(proctime, INTERVAL '1' MINUTE), name"),
        Row.class)
    .print();

tEnv.execute("testSql");
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)