You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by madhu phatak <ph...@gmail.com> on 2016/01/01 10:21:02 UTC

Getting executionplan in the local mode inside IDE

Hi,
I am trying to get execution plan for wordcount using below code in local
mode inside IntelliJ IDEA. I am using flink 0.10.0.

val env = ExecutionEnvironment.getExecutionEnvironment

val data = List("hi","how are you","hi")

val dataSet = env.fromCollection(data)

val words = dataSet.flatMap(value => value.split("\\s+"))

val mappedWords = words.map(value => (value,1))

val grouped = mappedWords.groupBy(0)

val sum = grouped.sum(1)

sum.print()

println(env.getExecutionPlan())


The program computes sum correctly, but fails with following exception for
last line

Exception in thread "main" java.lang.RuntimeException: No new data sinks
have been defined since the last execution. The last execution refers to
the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
at
org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:925)
at
org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:95)
at
org.apache.flink.api.scala.ExecutionEnvironment.getExecutionPlan(ExecutionEnvironment.scala:635)
at com.madhukaraphatak.flink.WordCount$.main(WordCount.scala:30)


I tried placing the getExecutionPlan in different places. But I get same
error. Is there any other way to get the execution plan in local mode?

-- 
Regards,
Madhukara Phatak
http://datamantra.io/

Re: Getting executionplan in the local mode inside IDE

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

you can only get the execution plan for programs that have a data sink and
haven't been executed. In your code print() defines the data sink, however
it also eagerly executes a program. After execution the program is
"removed" from the execution environment. Therefore, Flink complains that
no sink has been defined.

You can print the execution plan if you use a data sink that does not
eagerly execute. For example, you can replace the sum.print() statement
with sum.output(new DiscardingOutputFormat()) or
sum.writeAsText("file://some/path").

Cheers, Fabian



2016-01-01 10:21 GMT+01:00 madhu phatak <ph...@gmail.com>:

> Hi,
> I am trying to get execution plan for wordcount using below code in local
> mode inside IntelliJ IDEA. I am using flink 0.10.0.
>
> val env = ExecutionEnvironment.getExecutionEnvironment
>
> val data = List("hi","how are you","hi")
>
> val dataSet = env.fromCollection(data)
>
> val words = dataSet.flatMap(value => value.split("\\s+"))
>
> val mappedWords = words.map(value => (value,1))
>
> val grouped = mappedWords.groupBy(0)
>
> val sum = grouped.sum(1)
>
> sum.print()
>
> println(env.getExecutionPlan())
>
>
> The program computes sum correctly, but fails with following exception for
> last line
>
> Exception in thread "main" java.lang.RuntimeException: No new data sinks
> have been defined since the last execution. The last execution refers to
> the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:925)
> at
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:95)
> at
> org.apache.flink.api.scala.ExecutionEnvironment.getExecutionPlan(ExecutionEnvironment.scala:635)
> at com.madhukaraphatak.flink.WordCount$.main(WordCount.scala:30)
>
>
> I tried placing the getExecutionPlan in different places. But I get same
> error. Is there any other way to get the execution plan in local mode?
>
> --
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>