You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Theo Diefenthal <th...@scoop-software.de> on 2020/06/04 16:18:18 UTC

SQL Expression to Flink FilterFunction?

Hi there, 

I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and arbitrarily nested AND/OR constructs. 

I was wondering if I could somehow transform a SQL WHERE expression into a Flink FilterFunction? 

My approach right now is to register my Stream as a table, run a SQL query on it and return back to a DataStream like so: 

StreamExecutionEnvironment env = StreamExecutionEnvironment. createLocalEnvironment (); 
env.setParallelism( 1 ); 
env.setStreamTimeCharacteristic(TimeCharacteristic. EventTime ); 
StreamTableEnvironment tEnv = StreamTableEnvironment. create (env); 

List<SomePOJO> data = createPOJOTestData (); 
DataStream<SomePOJO> stream = env.fromCollection(data); 

//final Table asTable = tEnv.fromDataStream(stream); 
//Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO SQL style 'AND' possible here... 

tEnv.registerDataStream( "SAMPLE" , stream); 
Table filteredTable = tEnv.sqlQuery( "SELECT * FROM SAMPLE WHERE user = 'pitter' AND age > 10" ); 

stream = tEnv.toAppendStream(filteredTable, SomePOJO. class ); 
List<SomePOJO> list = IteratorUtils. toList (DataStreamUtils. collect (stream)); 
//... test assertions 

It feels a bit weird that I need to go the full way up to the SQL API with registering the table to "just" apply the WHERE clause of a table and that I can't assign uid, operator_name to this operator anymore, leaving the DataStream world. 

Is the way I wrote it the best way to approach or do you have any better idea? Are there any caveats here? Not that I didn't assign the event time column on purpose as I know that it's just a WHERE without any windowing etc and I wanted to test that it still works without any explicit time column :) 

Best regards 
Theo 


Re: SQL Expression to Flink FilterFunction?

Posted by Jark Wu <im...@gmail.com>.
This is possible but may need some development. There is a similar util in
table tests called
`org.apache.flink.table.expressions.utils.ExpressionTestBase` [1],
it converts/translates expressions (either Table API Expression or SQL
expression) into a MapFunction.

I think you can imitate the way of ExpressionTestBase, to translate into a
FilterFunction.

Best,
Jark

[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala

On Fri, 5 Jun 2020 at 10:17, Leonard Xu <xb...@gmail.com> wrote:

> Hi, Theo
>
> Currently, It’s hard to do this in your DataStream application from my
> understanding, because converting sql expression to Flink operator happens
> in underlying table planner (more precisely in code generate phase) and it
> does not expose interface to user so that you can not assign operator name,
> operator id.
>
> Best,
> Leonard Xu
>
> 在 2020年6月5日,00:18,Theo Diefenthal <th...@scoop-software.de> 写道:
>
> Hi there,
>
> I have a full DataStream pipeline (i.e. no upper level APIs like Table or
> SQL are involved). In the mid of the pipeline, I now need to filter on a
> SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include
> REGEXP and arbitrarily nested AND/OR constructs.
>
> I was wondering if I could somehow transform a SQL WHERE expression into a
> Flink FilterFunction?
>
> My approach right now is to register my Stream as a table, run a SQL query
> on it and return back to a DataStream like so:
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
> List<SomePOJO> data = createPOJOTestData();
> DataStream<SomePOJO> stream = env.fromCollection(data);
>
> //final Table asTable = tEnv.fromDataStream(stream);
> //Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO SQL style 'AND' possible here...
>
> tEnv.registerDataStream("SAMPLE", stream);
> Table filteredTable = tEnv.sqlQuery("SELECT * FROM SAMPLE WHERE user = 'pitter' AND age > 10");
>
> stream = tEnv.toAppendStream(filteredTable, SomePOJO.class);
> List<SomePOJO> list = IteratorUtils.toList(DataStreamUtils.collect(stream));
>
> //... test assertions
>
>
> It feels a bit weird that I need to go the full way up to the SQL API with
> registering the table to "just" apply the WHERE clause of a table and that
> I can't assign uid, operator_name to this operator anymore, leaving the
> DataStream world.
>
> Is the way I wrote it the best way to approach or do you have any better
> idea? Are there any caveats here? Not that I didn't assign the event time
> column on purpose as I know that it's just a WHERE without any windowing
> etc and I wanted to test that it still works without any explicit time
> column :)
>
> Best regards
> Theo
>
>
>

Re: SQL Expression to Flink FilterFunction?

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Theo

Currently, It’s hard to do this in your DataStream application from my understanding, because converting sql expression to Flink operator happens in underlying table planner (more precisely in code generate phase) and it does not expose interface to user so that you can not assign operator name, operator id.

Best,
Leonard Xu 

> 在 2020年6月5日,00:18,Theo Diefenthal <th...@scoop-software.de> 写道:
> 
> Hi there,
> 
> I have a full DataStream pipeline (i.e. no upper level APIs like Table or SQL are involved). In the mid of the pipeline, I now need to filter on a SQL WHERE expression, e.g. "user= 'pitter' AND age > 10", which can include REGEXP and arbitrarily nested AND/OR constructs. 
> 
> I was wondering if I could somehow transform a SQL WHERE expression into a Flink FilterFunction?
> 
> My approach right now is to register my Stream as a table, run a SQL query on it and return back to a DataStream like so:
> 
> StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> 
> List<SomePOJO> data = createPOJOTestData();
> DataStream<SomePOJO> stream = env.fromCollection(data);
> 
> //final Table asTable = tEnv.fromDataStream(stream);
> //Table filteredTable = asTable.where("user === 'pitter' && age > 10"); // NO SQL style 'AND' possible here...
> 
> tEnv.registerDataStream("SAMPLE", stream);
> Table filteredTable = tEnv.sqlQuery("SELECT * FROM SAMPLE WHERE user = 'pitter' AND age > 10");
> 
> stream = tEnv.toAppendStream(filteredTable, SomePOJO.class);
> List<SomePOJO> list = IteratorUtils.toList(DataStreamUtils.collect(stream));
> //... test assertions
> 
> It feels a bit weird that I need to go the full way up to the SQL API with registering the table to "just" apply the WHERE clause of a table and that I can't assign uid, operator_name to this operator anymore, leaving the DataStream world. 
> 
> Is the way I wrote it the best way to approach or do you have any better idea? Are there any caveats here? Not that I didn't assign the event time column on purpose as I know that it's just a WHERE without any windowing etc and I wanted to test that it still works without any explicit time column :) 
> 
> Best regards
> Theo
>