You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jack <ws...@163.com> on 2020/06/19 02:08:25 UTC
pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
场景:使用pyflink通过filter进行条件过滤后插入到sink中,
比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
{
"logType":"syslog",
"message":"sla;flkdsjf"
}
{
"logType":"alarm",
"message":"sla;flkdsjf"
}
t_env.from_path("source")\
.filter("logType=syslog")\
.insert_into("sink1")
有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
if logType=="syslog":
insert_into(sink1)
elif logType=="alarm":
insert_into(sink2)
如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
t_env.from_path("source")\
.filter("logType=syslog")\
.insert_into("sink1")\
.filter("logType=alarm")\
.insert_into("sink2")
请各位大牛指点,感谢
Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中
Posted by jack <ws...@163.com>.
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑
Best,
Jack
在 2020-06-22 14:28:04,"jincheng sun" <su...@gmail.com> 写道:
您好,jack:
Table API 不用 if/else 直接用类似逻辑即可:
val t1 = table.filter('x > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")
Best,
Jincheng
jack <ws...@163.com> 于2020年6月19日周五 上午10:35写道:
测试使用如下结构:
table= t_env.from_path("source")
if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")
我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??
在 2020-06-19 10:08:25,"jack" <ws...@163.com> 写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
> "logType":"syslog",
> "message":"sla;flkdsjf"
>}
>{
> "logType":"alarm",
> "message":"sla;flkdsjf"
>}
> t_env.from_path("source")\
> .filter("logType=syslog")\
> .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
> insert_into(sink1)
>elif logType=="alarm":
> insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
> t_env.from_path("source")\
> .filter("logType=syslog")\
> .insert_into("sink1")\
> .filter("logType=alarm")\
> .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>
Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中
Posted by jack <ws...@163.com>.
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑
Best,
Jack
在 2020-06-22 14:28:04,"jincheng sun" <su...@gmail.com> 写道:
您好,jack:
Table API 不用 if/else 直接用类似逻辑即可:
val t1 = table.filter('x > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")
Best,
Jincheng
jack <ws...@163.com> 于2020年6月19日周五 上午10:35写道:
测试使用如下结构:
table= t_env.from_path("source")
if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")
我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??
在 2020-06-19 10:08:25,"jack" <ws...@163.com> 写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
> "logType":"syslog",
> "message":"sla;flkdsjf"
>}
>{
> "logType":"alarm",
> "message":"sla;flkdsjf"
>}
> t_env.from_path("source")\
> .filter("logType=syslog")\
> .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
> insert_into(sink1)
>elif logType=="alarm":
> insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
> t_env.from_path("source")\
> .filter("logType=syslog")\
> .insert_into("sink1")\
> .filter("logType=alarm")\
> .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>
Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中
Posted by jincheng sun <su...@gmail.com>.
您好,jack:
Table API 不用 if/else 直接用类似逻辑即可:
val t1 = table.filter('x > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")
Best,
Jincheng
jack <ws...@163.com> 于2020年6月19日周五 上午10:35写道:
>
> 测试使用如下结构:
> table= t_env.from_path("source")
>
> if table.filter("logType=syslog"):
> table.filter("logType=syslog").insert_into("sink1")
> elif table.filter("logType=alarm"):
> table.filter("logType=alarm").insert_into("sink2")
>
>
> 我测试了下,好像table
> .filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是
> table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??
>
>
>
>
> 在 2020-06-19 10:08:25,"jack" <ws...@163.com> 写道:
> >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
> >
> >
> >场景:使用pyflink通过filter进行条件过滤后插入到sink中,
> >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
> >{
> > "logType":"syslog",
> > "message":"sla;flkdsjf"
> >}
> >{
> > "logType":"alarm",
> > "message":"sla;flkdsjf"
> >}
> > t_env.from_path("source")\
> > .filter("logType=syslog")\
> > .insert_into("sink1")
> >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
> >if logType=="syslog":
> > insert_into(sink1)
> >elif logType=="alarm":
> > insert_into(sink2)
> >
> >
> >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
> >
> >
> > t_env.from_path("source")\
> > .filter("logType=syslog")\
> > .insert_into("sink1")\
> > .filter("logType=alarm")\
> > .insert_into("sink2")
> >请各位大牛指点,感谢
> >
> >
> >
> >
> >
>
>
Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中
Posted by jincheng sun <su...@gmail.com>.
您好,jack:
Table API 不用 if/else 直接用类似逻辑即可:
val t1 = table.filter('x > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")
Best,
Jincheng
jack <ws...@163.com> 于2020年6月19日周五 上午10:35写道:
>
> 测试使用如下结构:
> table= t_env.from_path("source")
>
> if table.filter("logType=syslog"):
> table.filter("logType=syslog").insert_into("sink1")
> elif table.filter("logType=alarm"):
> table.filter("logType=alarm").insert_into("sink2")
>
>
> 我测试了下,好像table
> .filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是
> table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??
>
>
>
>
> 在 2020-06-19 10:08:25,"jack" <ws...@163.com> 写道:
> >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
> >
> >
> >场景:使用pyflink通过filter进行条件过滤后插入到sink中,
> >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
> >{
> > "logType":"syslog",
> > "message":"sla;flkdsjf"
> >}
> >{
> > "logType":"alarm",
> > "message":"sla;flkdsjf"
> >}
> > t_env.from_path("source")\
> > .filter("logType=syslog")\
> > .insert_into("sink1")
> >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
> >if logType=="syslog":
> > insert_into(sink1)
> >elif logType=="alarm":
> > insert_into(sink2)
> >
> >
> >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
> >
> >
> > t_env.from_path("source")\
> > .filter("logType=syslog")\
> > .insert_into("sink1")\
> > .filter("logType=alarm")\
> > .insert_into("sink2")
> >请各位大牛指点,感谢
> >
> >
> >
> >
> >
>
>
Re:pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中
Posted by jack <ws...@163.com>.
测试使用如下结构:
table= t_env.from_path("source")
if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")
我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??
在 2020-06-19 10:08:25,"jack" <ws...@163.com> 写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
> "logType":"syslog",
> "message":"sla;flkdsjf"
>}
>{
> "logType":"alarm",
> "message":"sla;flkdsjf"
>}
> t_env.from_path("source")\
> .filter("logType=syslog")\
> .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
> insert_into(sink1)
>elif logType=="alarm":
> insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
> t_env.from_path("source")\
> .filter("logType=syslog")\
> .insert_into("sink1")\
> .filter("logType=alarm")\
> .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>
Re:pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中
Posted by jack <ws...@163.com>.
测试使用如下结构:
table= t_env.from_path("source")
if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")
我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??
在 2020-06-19 10:08:25,"jack" <ws...@163.com> 写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
> "logType":"syslog",
> "message":"sla;flkdsjf"
>}
>{
> "logType":"alarm",
> "message":"sla;flkdsjf"
>}
> t_env.from_path("source")\
> .filter("logType=syslog")\
> .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
> insert_into(sink1)
>elif logType=="alarm":
> insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
> t_env.from_path("source")\
> .filter("logType=syslog")\
> .insert_into("sink1")\
> .filter("logType=alarm")\
> .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>