You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lu Weizheng <lu...@hotmail.com> on 2020/08/13 07:34:53 UTC

Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Hi,

I am using Flink 1.11 SQL using java. All my operations are in SQL. I create source tables and insert result into sink tables. No other Java operators. I execute it in Intellij. I can get the final result in the sink tables. However I get the following error. I am not sure it is a bug or there is something wrong in my code? Acutally it does not affect the computation.

Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()

Here's my code:

        EnvironmentSettings fsSettings =   EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);

        // create source and sink tables...

        tEnv.executeSql("INSERT INTO sensor_1min_avg " +
                "SELECT " +
                "  room, " +
                "  AVG(temp) AS avg_temp," +
                "  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
                "FROM sensor " +
                "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");

        env.execute("table api");



Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Weizheng


> 在 2020年8月13日,19:44,Danny Chan <yu...@gmail.com> 写道:
> 
> tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job to the backend cluster with a builtin job name

`tEnv.executeSql` is an asynchronous method which will submit the job immediately. If you’re test in your  IDE, you’d better obtain the TableResult object and wait for the execution as following piece of code,
otherwise your `main()` method in demo may exit before the execution finished.

TableResult result = tableEnvironment.executeSql("insert into ... ");
// wait for the insert job finished
result.getJobClient().get()
      .getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

Best
Leonard

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Posted by Danny Chan <yu...@gmail.com>.
Weighing ~

tEnv.executeSql would execute the SQL asynchronously, e.g. submitting a job to the backend cluster with a builtin job name, the tEnv.executeSql itself did return a JobResult immediately with a constant affected rows count -1.

Best,
Danny Chan
在 2020年8月13日 +0800 PM3:46,Lu Weizheng <lu...@hotmail.com>,写道:
> Thanks Timo,
>
> So no need to use execute() method in Flink SQL If I do all the thins from source to sink in SQL.
>
> Best Regards,
> Lu
>
> > 2020年8月13日 下午3:41,Timo Walther <tw...@apache.org> 写道:
> >
> > Hi Lu,
> >
> > `env.execute("table api");` is not necessary after FLIP-84 [1]. Every method that has `execute` in its name will immediately execute a job. Therefore your `env.execute` has an empty pipeline.
> >
> > Regards,
> > Timo
> >
> > [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >
> > On 13.08.20 09:34, Lu Weizheng wrote:
> > > Hi,
> > > I am using Flink 1.11 SQL using java. All my operations are in SQL. I create source tables and insert result into sink tables. No other Java operators. I execute it in Intellij. I can get the final result in the sink tables. However I get the following error. I am not sure it is a bug or there is something wrong in my code? Acutally it does not affect the computation.
> > > /Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute./
> > > /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
> > > /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
> > > /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
> > > /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
> > > /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
> > > Here's my code:
> > > EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
> > > // create source and sink tables...
> > > tEnv.executeSql("INSERT INTO sensor_1min_avg " +
> > > "SELECT " +
> > > " room, " +
> > > " AVG(temp) AS avg_temp," +
> > > " TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
> > > "FROM sensor " +
> > > "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
> > > env.execute("table api");
> >
>

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Posted by Lu Weizheng <lu...@hotmail.com>.
Thanks Timo,

So no need to use execute() method in Flink SQL If I do all the thins from source to sink in SQL.

Best Regards,
Lu

> 2020年8月13日 下午3:41,Timo Walther <tw...@apache.org> 写道:
> 
> Hi Lu,
> 
> `env.execute("table api");` is not necessary after FLIP-84 [1]. Every method that has `execute` in its name will immediately execute a job. Therefore your `env.execute` has an empty pipeline.
> 
> Regards,
> Timo
> 
> [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> 
> On 13.08.20 09:34, Lu Weizheng wrote:
>> Hi,
>> I am using Flink 1.11 SQL using java. All my operations are in SQL. I create source tables and insert result into sink tables. No other Java operators. I execute it in Intellij. I can get the final result in the sink tables. However I get the following error. I am not sure it is a bug or there is something wrong in my code? Acutally it does not affect the computation.
>> /Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute./
>> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
>> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
>> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
>> /at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
>> /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
>> Here's my code:
>>         EnvironmentSettings fsSettings =    EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
>>         // create source and sink tables...
>>         tEnv.executeSql("INSERT INTO sensor_1min_avg " +
>>                 "SELECT " +
>>                 "  room, " +
>>                 "  AVG(temp) AS avg_temp," +
>>                 "  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
>>                 "FROM sensor " +
>>                 "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
>>         env.execute("table api");
> 


Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

Posted by Timo Walther <tw...@apache.org>.
Hi Lu,

`env.execute("table api");` is not necessary after FLIP-84 [1]. Every 
method that has `execute` in its name will immediately execute a job. 
Therefore your `env.execute` has an empty pipeline.

Regards,
Timo

[1] 
https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

On 13.08.20 09:34, Lu Weizheng wrote:
> Hi,
> 
> I am using Flink 1.11 SQL using java. All my operations are in SQL. I 
> create source tables and insert result into sink tables. No other Java 
> operators. I execute it in Intellij. I can get the final result in the 
> sink tables. However I get the following error. I am not sure it is a 
> bug or there is something wrong in my code? Acutally it does not affect 
> the computation.
> 
> /Exception in thread "main" java.lang.IllegalStateException: No 
> operators defined in streaming topology. Cannot execute./
> /at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
> /at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
> /at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
> /at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
> /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
> 
> Here's my code:
> 
>          EnvironmentSettings fsSettings =  
>   EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>          StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          StreamTableEnvironment tEnv = 
> StreamTableEnvironment.create(env, fsSettings);
>          // create source and sink tables...
> 
>          tEnv.executeSql("INSERT INTO sensor_1min_avg " +
>                  "SELECT " +
>                  "  room, " +
>                  "  AVG(temp) AS avg_temp," +
>                  "  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
>                  "FROM sensor " +
>                  "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
> 
>          env.execute("table api");
> 
>