You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2017/09/08 15:55:59 UTC

Table API and registration of DataSet/DataStream

Hi to all,
I have a doubt about Table API.
Let's say my code is something like:


StreamTableEnvironment te = ...;
RowTypeInfo rtf = new RowTypeInfo(...);
DataStream<Row> myDs =
te.registerDataStream("test",myDs,columnNames);

Table table = te.sql("SELECT *, (NAME = 'John') as VALID FROM test WHERE
...";
myDs = te.toDataStream(table.where("VALID").select(columnNames), rtf);

If I do:

DataStream<Row> res = te.sql("SELECT * FROM test");

I'd like that res could take the data from the last version of myDs...is
this program correct..?
Or should I override the "test" table in the tableEnvironment? Is that
possible? I don't see any API to allow this..

Best,
Flavio

Re: Table API and registration of DataSet/DataStream

Posted by Flavio Pompermaier <po...@okkam.it>.
I see...anyway for me it continue to be very misleading to have different
syntax for where clauses (SQL vs scala)...
Why not make them compatible? Is it that complex?


On Thu, Sep 14, 2017 at 4:26 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Flavio,
>
> 1) The Java Table API does not aim to resemble SQL but the Scala Table API
> which is integrated with the host language (Scala).
> Hence the different syntax for expressions.
>
> 2) Yes, that would be one way to do it. If that adds to much boilerplate
> code, you could encapsulate the code in your own helper class.
> We do not provide a TableUtils class, because this is out of scope of the
> Table API.
> It would be a bit of effort to make this generic for different data types
> because the DataSet can be of any type (Tuple, Pojo, Row, etc.) and would
> not be used in the Table API anyway.
>
> Best, Fabian
>
>
>
>
> 2017-09-14 16:12 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi Fabian,
>> basically these were my problems with Table API.
>>
>> 1 ) Table.sql() has a different where syntax than Table.where() , and
>> this is very annoying (IMHO). Ex:
>>   Table.sql("SELECT * FROM XXX WHERE Y IS NOT NULL) vs
>> Table.i.where("Y.isNotNull").
>>
>> 2) If I understood correctly, my program that ideally could be something
>> like:
>>
>> Dataset<Row> ds = ....filter(TableUtils.getWhereAsFilter(ds, fieldTypes,
>> fieldNames, "Y IS NOT NULL");
>>
>> I should do:
>>
>> BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
>> Table table = te.fromDataset(ds,fieldNames); //why not support an array
>> of fieldName
>> ds = tEnv.toDataSet(table.where("Y.isNotNull"), new
>> RowTypeInfo(fieldTypes));
>>
>> Is this correct?
>> Moreover, fromDataset requires fieldNames to be a comma separated String,
>> why not support also fieldNames as String[]...?
>>
>> Best,
>> Flavio
>>
>>
>> On Thu, Sep 14, 2017 at 3:43 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Not sure what you mean by "translate a where clause to a filter
>>> function".
>>>
>>> Isn't that exactly what Table.filter(String condition) is doing?
>>> It translates a SQL-like condition (represented as String) into an
>>> operator that filter the Table.
>>>
>>>
>>> 2017-09-09 23:49 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Yes I can do that of course.
>>>> What I need is basically the possibility to translate a where clause to
>>>> a filter function. Is there any utility class that does that in Flink?
>>>>
>>>> On 9 Sep 2017 21:54, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> I tried to follow your example. If I got it right, you would like to
>>>>> change the registered table by assigning a different DataStream to the
>>>>> original myDs variable.
>>>>>
>>>>> With registerDataStream("test", myDs, ...) you don't register the
>>>>> variable myDs as a table but it's current value, i.e., a reference to a
>>>>> DataStream object.
>>>>> By changing the value of myDs, you just override the reference in myDs
>>>>> but do not change the reference that was registered in Calcite's catalog.
>>>>> This is common behavior in many programming languages including Java.
>>>>>
>>>>> Right now, there is no way to change or override a registered table.
>>>>> We had this functionality once, but had to remove it after a Calcite
>>>>> version upgrade.
>>>>> Can you use a new TableEnvironment and register the new table there?
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2017-09-08 17:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> Hi to all,
>>>>>> I have a doubt about Table API.
>>>>>> Let's say my code is something like:
>>>>>>
>>>>>>
>>>>>> StreamTableEnvironment te = ...;
>>>>>> RowTypeInfo rtf = new RowTypeInfo(...);
>>>>>> DataStream<Row> myDs =
>>>>>> te.registerDataStream("test",myDs,columnNames);
>>>>>>
>>>>>> Table table = te.sql("SELECT *, (NAME = 'John') as VALID FROM test
>>>>>> WHERE ...";
>>>>>> myDs = te.toDataStream(table.where("VALID").select(columnNames),
>>>>>> rtf);
>>>>>>
>>>>>> If I do:
>>>>>>
>>>>>> DataStream<Row> res = te.sql("SELECT * FROM test");
>>>>>>
>>>>>> I'd like that res could take the data from the last version of
>>>>>> myDs...is this program correct..?
>>>>>> Or should I override the "test" table in the tableEnvironment? Is
>>>>>> that possible? I don't see any API to allow this..
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>
>>>>>
>>
>

Re: Table API and registration of DataSet/DataStream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Flavio,

1) The Java Table API does not aim to resemble SQL but the Scala Table API
which is integrated with the host language (Scala).
Hence the different syntax for expressions.

2) Yes, that would be one way to do it. If that adds to much boilerplate
code, you could encapsulate the code in your own helper class.
We do not provide a TableUtils class, because this is out of scope of the
Table API.
It would be a bit of effort to make this generic for different data types
because the DataSet can be of any type (Tuple, Pojo, Row, etc.) and would
not be used in the Table API anyway.

Best, Fabian




2017-09-14 16:12 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Hi Fabian,
> basically these were my problems with Table API.
>
> 1 ) Table.sql() has a different where syntax than Table.where() , and this
> is very annoying (IMHO). Ex:
>   Table.sql("SELECT * FROM XXX WHERE Y IS NOT NULL) vs
> Table.i.where("Y.isNotNull").
>
> 2) If I understood correctly, my program that ideally could be something
> like:
>
> Dataset<Row> ds = ....filter(TableUtils.getWhereAsFilter(ds, fieldTypes,
> fieldNames, "Y IS NOT NULL");
>
> I should do:
>
> BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
> Table table = te.fromDataset(ds,fieldNames); //why not support an array of
> fieldName
> ds = tEnv.toDataSet(table.where("Y.isNotNull"), new
> RowTypeInfo(fieldTypes));
>
> Is this correct?
> Moreover, fromDataset requires fieldNames to be a comma separated String,
> why not support also fieldNames as String[]...?
>
> Best,
> Flavio
>
>
> On Thu, Sep 14, 2017 at 3:43 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Not sure what you mean by "translate a where clause to a filter function".
>>
>> Isn't that exactly what Table.filter(String condition) is doing?
>> It translates a SQL-like condition (represented as String) into an
>> operator that filter the Table.
>>
>>
>> 2017-09-09 23:49 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Yes I can do that of course.
>>> What I need is basically the possibility to translate a where clause to
>>> a filter function. Is there any utility class that does that in Flink?
>>>
>>> On 9 Sep 2017 21:54, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> I tried to follow your example. If I got it right, you would like to
>>>> change the registered table by assigning a different DataStream to the
>>>> original myDs variable.
>>>>
>>>> With registerDataStream("test", myDs, ...) you don't register the
>>>> variable myDs as a table but it's current value, i.e., a reference to a
>>>> DataStream object.
>>>> By changing the value of myDs, you just override the reference in myDs
>>>> but do not change the reference that was registered in Calcite's catalog.
>>>> This is common behavior in many programming languages including Java.
>>>>
>>>> Right now, there is no way to change or override a registered table. We
>>>> had this functionality once, but had to remove it after a Calcite version
>>>> upgrade.
>>>> Can you use a new TableEnvironment and register the new table there?
>>>>
>>>> Best, Fabian
>>>>
>>>> 2017-09-08 17:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Hi to all,
>>>>> I have a doubt about Table API.
>>>>> Let's say my code is something like:
>>>>>
>>>>>
>>>>> StreamTableEnvironment te = ...;
>>>>> RowTypeInfo rtf = new RowTypeInfo(...);
>>>>> DataStream<Row> myDs =
>>>>> te.registerDataStream("test",myDs,columnNames);
>>>>>
>>>>> Table table = te.sql("SELECT *, (NAME = 'John') as VALID FROM test
>>>>> WHERE ...";
>>>>> myDs = te.toDataStream(table.where("VALID").select(columnNames), rtf);
>>>>>
>>>>> If I do:
>>>>>
>>>>> DataStream<Row> res = te.sql("SELECT * FROM test");
>>>>>
>>>>> I'd like that res could take the data from the last version of
>>>>> myDs...is this program correct..?
>>>>> Or should I override the "test" table in the tableEnvironment? Is that
>>>>> possible? I don't see any API to allow this..
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>>
>

Re: Table API and registration of DataSet/DataStream

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Fabian,
basically these were my problems with Table API.

1 ) Table.sql() has a different where syntax than Table.where() , and this
is very annoying (IMHO). Ex:
  Table.sql("SELECT * FROM XXX WHERE Y IS NOT NULL) vs
Table.i.where("Y.isNotNull").

2) If I understood correctly, my program that ideally could be something
like:

Dataset<Row> ds = ....filter(TableUtils.getWhereAsFilter(ds, fieldTypes,
fieldNames, "Y IS NOT NULL");

I should do:

BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
Table table = te.fromDataset(ds,fieldNames); //why not support an array of
fieldName
ds = tEnv.toDataSet(table.where("Y.isNotNull"), new
RowTypeInfo(fieldTypes));

Is this correct?
Moreover, fromDataset requires fieldNames to be a comma separated String,
why not support also fieldNames as String[]...?

Best,
Flavio


On Thu, Sep 14, 2017 at 3:43 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Not sure what you mean by "translate a where clause to a filter function".
>
> Isn't that exactly what Table.filter(String condition) is doing?
> It translates a SQL-like condition (represented as String) into an
> operator that filter the Table.
>
>
> 2017-09-09 23:49 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Yes I can do that of course.
>> What I need is basically the possibility to translate a where clause to a
>> filter function. Is there any utility class that does that in Flink?
>>
>> On 9 Sep 2017 21:54, "Fabian Hueske" <fh...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> I tried to follow your example. If I got it right, you would like to
>>> change the registered table by assigning a different DataStream to the
>>> original myDs variable.
>>>
>>> With registerDataStream("test", myDs, ...) you don't register the
>>> variable myDs as a table but it's current value, i.e., a reference to a
>>> DataStream object.
>>> By changing the value of myDs, you just override the reference in myDs
>>> but do not change the reference that was registered in Calcite's catalog.
>>> This is common behavior in many programming languages including Java.
>>>
>>> Right now, there is no way to change or override a registered table. We
>>> had this functionality once, but had to remove it after a Calcite version
>>> upgrade.
>>> Can you use a new TableEnvironment and register the new table there?
>>>
>>> Best, Fabian
>>>
>>> 2017-09-08 17:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Hi to all,
>>>> I have a doubt about Table API.
>>>> Let's say my code is something like:
>>>>
>>>>
>>>> StreamTableEnvironment te = ...;
>>>> RowTypeInfo rtf = new RowTypeInfo(...);
>>>> DataStream<Row> myDs =
>>>> te.registerDataStream("test",myDs,columnNames);
>>>>
>>>> Table table = te.sql("SELECT *, (NAME = 'John') as VALID FROM test
>>>> WHERE ...";
>>>> myDs = te.toDataStream(table.where("VALID").select(columnNames), rtf);
>>>>
>>>> If I do:
>>>>
>>>> DataStream<Row> res = te.sql("SELECT * FROM test");
>>>>
>>>> I'd like that res could take the data from the last version of
>>>> myDs...is this program correct..?
>>>> Or should I override the "test" table in the tableEnvironment? Is that
>>>> possible? I don't see any API to allow this..
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>>

Re: Table API and registration of DataSet/DataStream

Posted by Fabian Hueske <fh...@gmail.com>.
Not sure what you mean by "translate a where clause to a filter function".

Isn't that exactly what Table.filter(String condition) is doing?
It translates a SQL-like condition (represented as String) into an operator
that filter the Table.


2017-09-09 23:49 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Yes I can do that of course.
> What I need is basically the possibility to translate a where clause to a
> filter function. Is there any utility class that does that in Flink?
>
> On 9 Sep 2017 21:54, "Fabian Hueske" <fh...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> I tried to follow your example. If I got it right, you would like to
>> change the registered table by assigning a different DataStream to the
>> original myDs variable.
>>
>> With registerDataStream("test", myDs, ...) you don't register the
>> variable myDs as a table but it's current value, i.e., a reference to a
>> DataStream object.
>> By changing the value of myDs, you just override the reference in myDs
>> but do not change the reference that was registered in Calcite's catalog.
>> This is common behavior in many programming languages including Java.
>>
>> Right now, there is no way to change or override a registered table. We
>> had this functionality once, but had to remove it after a Calcite version
>> upgrade.
>> Can you use a new TableEnvironment and register the new table there?
>>
>> Best, Fabian
>>
>> 2017-09-08 17:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Hi to all,
>>> I have a doubt about Table API.
>>> Let's say my code is something like:
>>>
>>>
>>> StreamTableEnvironment te = ...;
>>> RowTypeInfo rtf = new RowTypeInfo(...);
>>> DataStream<Row> myDs =
>>> te.registerDataStream("test",myDs,columnNames);
>>>
>>> Table table = te.sql("SELECT *, (NAME = 'John') as VALID FROM test WHERE
>>> ...";
>>> myDs = te.toDataStream(table.where("VALID").select(columnNames), rtf);
>>>
>>> If I do:
>>>
>>> DataStream<Row> res = te.sql("SELECT * FROM test");
>>>
>>> I'd like that res could take the data from the last version of myDs...is
>>> this program correct..?
>>> Or should I override the "test" table in the tableEnvironment? Is that
>>> possible? I don't see any API to allow this..
>>>
>>> Best,
>>> Flavio
>>>
>>
>>

Re: Table API and registration of DataSet/DataStream

Posted by Flavio Pompermaier <po...@okkam.it>.
Yes I can do that of course.
What I need is basically the possibility to translate a where clause to a
filter function. Is there any utility class that does that in Flink?

On 9 Sep 2017 21:54, "Fabian Hueske" <fh...@gmail.com> wrote:

> Hi Flavio,
>
> I tried to follow your example. If I got it right, you would like to
> change the registered table by assigning a different DataStream to the
> original myDs variable.
>
> With registerDataStream("test", myDs, ...) you don't register the variable
> myDs as a table but it's current value, i.e., a reference to a DataStream
> object.
> By changing the value of myDs, you just override the reference in myDs but
> do not change the reference that was registered in Calcite's catalog.
> This is common behavior in many programming languages including Java.
>
> Right now, there is no way to change or override a registered table. We
> had this functionality once, but had to remove it after a Calcite version
> upgrade.
> Can you use a new TableEnvironment and register the new table there?
>
> Best, Fabian
>
> 2017-09-08 17:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi to all,
>> I have a doubt about Table API.
>> Let's say my code is something like:
>>
>>
>> StreamTableEnvironment te = ...;
>> RowTypeInfo rtf = new RowTypeInfo(...);
>> DataStream<Row> myDs =
>> te.registerDataStream("test",myDs,columnNames);
>>
>> Table table = te.sql("SELECT *, (NAME = 'John') as VALID FROM test WHERE
>> ...";
>> myDs = te.toDataStream(table.where("VALID").select(columnNames), rtf);
>>
>> If I do:
>>
>> DataStream<Row> res = te.sql("SELECT * FROM test");
>>
>> I'd like that res could take the data from the last version of myDs...is
>> this program correct..?
>> Or should I override the "test" table in the tableEnvironment? Is that
>> possible? I don't see any API to allow this..
>>
>> Best,
>> Flavio
>>
>
>

Re: Table API and registration of DataSet/DataStream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Flavio,

I tried to follow your example. If I got it right, you would like to change
the registered table by assigning a different DataStream to the original
myDs variable.

With registerDataStream("test", myDs, ...) you don't register the variable
myDs as a table but it's current value, i.e., a reference to a DataStream
object.
By changing the value of myDs, you just override the reference in myDs but
do not change the reference that was registered in Calcite's catalog.
This is common behavior in many programming languages including Java.

Right now, there is no way to change or override a registered table. We had
this functionality once, but had to remove it after a Calcite version
upgrade.
Can you use a new TableEnvironment and register the new table there?

Best, Fabian

2017-09-08 17:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Hi to all,
> I have a doubt about Table API.
> Let's say my code is something like:
>
>
> StreamTableEnvironment te = ...;
> RowTypeInfo rtf = new RowTypeInfo(...);
> DataStream<Row> myDs =
> te.registerDataStream("test",myDs,columnNames);
>
> Table table = te.sql("SELECT *, (NAME = 'John') as VALID FROM test WHERE
> ...";
> myDs = te.toDataStream(table.where("VALID").select(columnNames), rtf);
>
> If I do:
>
> DataStream<Row> res = te.sql("SELECT * FROM test");
>
> I'd like that res could take the data from the last version of myDs...is
> this program correct..?
> Or should I override the "test" table in the tableEnvironment? Is that
> possible? I don't see any API to allow this..
>
> Best,
> Flavio
>