You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "xiaohang.li (Jira)" <ji...@apache.org> on 2020/08/14 09:19:00 UTC

[jira] [Updated] (FLINK-18960) flink sideoutput union

     [ https://issues.apache.org/jira/browse/FLINK-18960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

xiaohang.li updated FLINK-18960:
--------------------------------
    Affects Version/s: 1.11.0
                       1.11.1
          Description: 
flink sideoutput union操作时数据出现问题。从主流分出来的侧输出流进行union操作时,显示输出的是以最后一个union的数据流结果*union的次数

 

val side = new OutputTag[String]("side")
 val side2 = new OutputTag[String]("side2")
 val side3 = new OutputTag[String]("side3")
 val ds = env.socketTextStream("master",9001)
 val res = ds.process(new ProcessFunction[String,String] {
 override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
 if(value.contains("hello"))

{ ctx.output(side,value) }

else if(value.contains("world"))

{ ctx.output(side2,value) }

else if(value.contains("flink"))

{ ctx.output(side3,value) }

out.collect(value)
 }
 })

val res1 = res.getSideOutput(side)
 val res2 = res.getSideOutput(side2)
 val res3 = res.getSideOutput(side3)

println( "====>"+res1.getClass)
 println( "====>"+res2.getClass)

res1.print("res1")
 res2.print("res2")
 res3.print("res3")

res2.union(res1).union(res3).print("all")

 

 

 

在socket端口分别输入

hello

world

flink

 

idea显示数据如下

res1> hello
 res2> world
 res3> flink
 all> flink
 all> flink
 all> flink

 

可见在all输出流显示的是最后一个union的侧输出流*union的次数,实际显示应为

all>flink

如果分别再对每个侧输出流进行map操作后再执行上面的代码即显示正确的数据输出,但是这一步操作应该是非必须的

  was:
flink sideoutput union操作时数据出现问题。从主流分出来的侧输出流进行union操作时,显示输出的是以最后一个union的数据流结果*union的次数

 

          Environment:     (was: val side = new OutputTag[String]("side")
val side2 = new OutputTag[String]("side2")
val side3 = new OutputTag[String]("side3")
val ds = env.socketTextStream("master",9001)
val res = ds.process(new ProcessFunction[String,String] {
 override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
 if(value.contains("hello")){
 ctx.output(side,value)
 }else if(value.contains("world")){
 ctx.output(side2,value)
 }else if(value.contains("flink")){
 ctx.output(side3,value)
 }
 out.collect(value)
 }
})

val res1 = res.getSideOutput(side)
val res2 = res.getSideOutput(side2)
val res3 = res.getSideOutput(side3)


println( "====>"+res1.getClass)
println( "====>"+res2.getClass)


res1.print("res1")
res2.print("res2")
res3.print("res3")

res2.union(res1).union(res3).print("all")

 

 

 

在socket端口分别输入

hello

world

flink

 

idea显示数据如下

res1> hello
res2> world
res3> flink
all> flink
all> flink
all> flink

 

可见在all输出流显示的是最后一个union的侧输出流*union的次数,实际显示应为

all>flink)

> flink sideoutput union
> ----------------------
>
>                 Key: FLINK-18960
>                 URL: https://issues.apache.org/jira/browse/FLINK-18960
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.10.1, 1.11.0, 1.11.1
>            Reporter: xiaohang.li
>            Priority: Minor
>
> flink sideoutput union操作时数据出现问题。从主流分出来的侧输出流进行union操作时,显示输出的是以最后一个union的数据流结果*union的次数
>  
> val side = new OutputTag[String]("side")
>  val side2 = new OutputTag[String]("side2")
>  val side3 = new OutputTag[String]("side3")
>  val ds = env.socketTextStream("master",9001)
>  val res = ds.process(new ProcessFunction[String,String] {
>  override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
>  if(value.contains("hello"))
> { ctx.output(side,value) }
> else if(value.contains("world"))
> { ctx.output(side2,value) }
> else if(value.contains("flink"))
> { ctx.output(side3,value) }
> out.collect(value)
>  }
>  })
> val res1 = res.getSideOutput(side)
>  val res2 = res.getSideOutput(side2)
>  val res3 = res.getSideOutput(side3)
> println( "====>"+res1.getClass)
>  println( "====>"+res2.getClass)
> res1.print("res1")
>  res2.print("res2")
>  res3.print("res3")
> res2.union(res1).union(res3).print("all")
>  
>  
>  
> 在socket端口分别输入
> hello
> world
> flink
>  
> idea显示数据如下
> res1> hello
>  res2> world
>  res3> flink
>  all> flink
>  all> flink
>  all> flink
>  
> 可见在all输出流显示的是最后一个union的侧输出流*union的次数,实际显示应为
> all>flink
> 如果分别再对每个侧输出流进行map操作后再执行上面的代码即显示正确的数据输出,但是这一步操作应该是非必须的



--
This message was sent by Atlassian Jira
(v8.3.4#803005)