You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niklas Teichmann <ma...@studserv.uni-leipzig.de> on 2018/11/20 16:13:12 UTC
Flink Table Duplicate Evaluation
Hi everybody,
I have a question concerning the Flink Table API, more precisely the
way the results of tables statements are evaluated. In the following
code example, the statement defining the table t1 is evaluated twice,
an effect that leads to some issues of performance and logic in the
program I am trying to write.
List<Long> longList = Arrays.asList(1L, 2L, 3L, 4L, 5L);
DataSet<Long> longDataSet =
getExecutionEnvironment().fromCollection(longList);
tenv.registerDataSet("longs", longDataSet, "l");
tenv.registerFunction("time", new Time()); //an example UDF that
evaluates the current time
Table t1 = tenv.scan("longs");
t1 = t1.select("l, time() as t");
Table t2 = t1.as("l1, id1");
Table t3 = t1.as("l2, id2");
Table t4 = t2.join(t3).where("l1 == l2");
t4.writeToSink(new PrintTableSink() ); //a sink that prints the
content of the table
I realize that this behaviour is defined in the documentation ("A
registered Table is treated similarly to a VIEW ...") and probably
stems from the DataStream API. But is there a preferred way to avoid
this?
Currently I'm using a workaround that defines a TableSink which in
turn registers its output as a new table. That seems extremely hacky
though.
Sorry if I missed something obvious!
All the best,
Niklas
--
Re: Flink Table Duplicate Evaluation
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Niklas,
The workaround that you described should work fine.
However, you don't need a custom sink.
Converting the Table into a DataSet and registering the DataSet again as a
Table is currently the way to solve this issue.
Best, Fabian
Am Di., 20. Nov. 2018 um 17:13 Uhr schrieb Niklas Teichmann <
mai11hcg@studserv.uni-leipzig.de>:
> Hi everybody,
>
> I have a question concerning the Flink Table API, more precisely the
> way the results of tables statements are evaluated. In the following
> code example, the statement defining the table t1 is evaluated twice,
> an effect that leads to some issues of performance and logic in the
> program I am trying to write.
>
> List<Long> longList = Arrays.asList(1L, 2L, 3L, 4L, 5L);
> DataSet<Long> longDataSet =
> getExecutionEnvironment().fromCollection(longList);
>
> tenv.registerDataSet("longs", longDataSet, "l");
> tenv.registerFunction("time", new Time()); //an example UDF that
> evaluates the current time
>
> Table t1 = tenv.scan("longs");
> t1 = t1.select("l, time() as t");
>
> Table t2 = t1.as("l1, id1");
> Table t3 = t1.as("l2, id2");
>
> Table t4 = t2.join(t3).where("l1 == l2");
>
> t4.writeToSink(new PrintTableSink() ); //a sink that prints the
> content of the table
>
> I realize that this behaviour is defined in the documentation ("A
> registered Table is treated similarly to a VIEW ...") and probably
> stems from the DataStream API. But is there a preferred way to avoid
> this?
>
> Currently I'm using a workaround that defines a TableSink which in
> turn registers its output as a new table. That seems extremely hacky
> though.
>
> Sorry if I missed something obvious!
>
> All the best,
> Niklas
>
>
> --
>
>
>
>