You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by maidangdang44 <ma...@126.com> on 2018/10/23 04:45:09 UTC
how to use the TUMBLE(time_attr, interval) window function in Flink
SQL
below is one line of my source, the body containes the user logs:
{
body: [
"user1,url1,2018-10-23 00:00:00;user2,url2,2018-10-23 00:01:00;user3,url3,2018-10-23 00:02:00"
]
}
I user LATERAL TABLE and a User-Defined TableFunction flatmap the source to a new table log, and I want to group by the time and username, here is my code:
public class BodySplitFun extends TableFunction<Tuple3<String, String, Long>> {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void eval(Row bodyRow) {
String body = bodyRow.getField(0).toString();
String[] lines = body.split(";");
for (String line : lines) {
String user = line.split(",")[0];
String url = line.split(",")[1];
String sTime = line.split(",")[2];
collect(new Tuple3<>(user, url, sdf.parse(sTime).getTime());
}
}
}
}
tblEnv.registerFunction("bodySplit", new BodySplitFun());
tblEnv.sqlUpdate(
"select
count(username)
from
(
SELECT
username,
url,
sTime
FROM
mySource LEFT JOIN LATERAL TABLE(bodySplit(body)) as T(username, url, sTime) ON TRUE
)
log
group by
TUMBLE(log.sTime, INTERVAL '1' MINUTE), log.username");
when I run my program, I got these error message:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL DAY>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 49 more
how can I group by the sTime in the table log?
Re: how to use the TUMBLE(time_attr, interval) window function in
Flink SQL
Posted by maidangdang44 <ma...@126.com>.
This was very helpful. Thank you very much :)
| |
maidangdang44
|
|
maidangdang44@126.com
|
签名由网易邮箱大师定制
On 10/23/2018 15:20,Dawid Wysakowicz<dw...@apache.org> wrote:
Hi,
The problem is that sTime is not a Time Attribute[1], which has to be aligned with watermarks mechanism. Right now you cannot create a time attribute from within TableFunction, as far as I know.
What you could do is to do the splitting logic in DataStream API and register a proper table with implemented watermarks in TableEnvironment. Then you can apply the windowing on a table prepared that way.
Best,
Dawid
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#time-attributes
On 23/10/2018 06:45, maidangdang44 wrote:
below is one line of my source, the body containes the user logs:
{
body: [
"user1,url1,2018-10-23 00:00:00;user2,url2,2018-10-23 00:01:00;user3,url3,2018-10-23 00:02:00"
]
}
I user LATERAL TABLE and a User-Defined TableFunction flatmap the source to a new table log, and I want to group by the time and username, here is my code:
public class BodySplitFun extends TableFunction<Tuple3<String, String, Long>> {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void eval(Row bodyRow) {
String body = bodyRow.getField(0).toString();
String[] lines = body.split(";");
for (String line : lines) {
String user = line.split(",")[0];
String url = line.split(",")[1];
String sTime = line.split(",")[2];
collect(new Tuple3<>(user, url, sdf.parse(sTime).getTime());
}
}
}
}
tblEnv.registerFunction("bodySplit", new BodySplitFun());
tblEnv.sqlUpdate(
"select
count(username)
from
(
SELECT
username,
url,
sTime
FROM
mySource LEFT JOIN LATERAL TABLE(bodySplit(body)) as T(username, url, sTime) ON TRUE
)
log
group by
TUMBLE(log.sTime, INTERVAL '1' MINUTE), log.username");
when I run my program, I got these error message:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL DAY>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 49 more
how can I group by the sTime in the table log?
Re: how to use the TUMBLE(time_attr, interval) window function in
Flink SQL
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,
The problem is that sTime is not a Time Attribute[1], which has to be
aligned with watermarks mechanism. Right now you cannot create a time
attribute from within TableFunction, as far as I know.
What you could do is to do the splitting logic in DataStream API and
register a proper table with implemented watermarks in TableEnvironment.
Then you can apply the windowing on a table prepared that way.
Best,
Dawid
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#time-attributes
On 23/10/2018 06:45, maidangdang44 wrote:
> below is one line of my source, the body containes the user logs:
> {
> body: [
> "user1,url1,2018-10-23 00:00:00;user2,url2,2018-10-23
> 00:01:00;user3,url3,2018-10-23 00:02:00"
> ]
> }
>
>
> I user LATERAL TABLE and a User-Defined TableFunction flatmap the
> source to a new table log, and I want to group by the time and
> username, here is my code:
>
> public class BodySplitFun extends TableFunction<Tuple3<String, String,
> Long>> {
> private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> HH:mm:ss");
> public void eval(Row bodyRow) {
> String body = bodyRow.getField(0).toString();
> String[] lines = body.split(";");
> for (String line : lines) {
> String user = line.split(",")[0];
> String url = line.split(",")[1];
> String sTime = line.split(",")[2];
> collect(new Tuple3<>(user, url,
> sdf.parse(sTime).getTime());
> }
> }
> }
> }
>
> tblEnv.registerFunction("bodySplit", new BodySplitFun());
> tblEnv.sqlUpdate(
> "select
> count(username)
> from
> (
> SELECT
> username,
> url,
> sTime
> FROM
> mySource LEFT JOIN LATERAL TABLE(bodySplit(body))
> as T(username, url, sTime) ON TRUE
> )
> log
> group by
> TUMBLE(log.sTime, INTERVAL '1' MINUTE), log.username");
>
> when I run my program, I got these error message:
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
> Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL
> DAY>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at
> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 49 more
>
> how can I group by the sTime in the table log?
>
>
>
>
>