You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by hery168 <he...@163.com> on 2020/04/28 09:24:10 UTC

flink dataset 分组后拼接分组后内容

col1 col2 pid 

1.0  2.0  1

2.0  2.0  1

1.0  2.0  1

3.0  2.0  1

1.0  2.0  1

1.0  2.0  2

1.0  2.0  2

1.0  2.0  2

1.0  2.0  2

1.0  2.0  2
各位大神,想问一下利用flink dataset 对pid 列进行分组,然后对分组后的col1列的内容进行拼接,如1.0#2.0#1.0#3.0....
请问大家这个该怎么实现?

Re: flink dataset 分组后拼接分组后内容

Posted by 祝尚 <17...@163.com>.
Hi,hery168
可以这样写

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Tuple3.of(1.0, 2.0, 1), Tuple3.of(2.0, 2.0, 1),Tuple3.of(3.0,2.0,1), Tuple3.of(1.0, 2.0, 2), Tuple3.of(2.0, 2.0, 2),Tuple3.of(3.0,2.0,2))
        .map((MapFunction<Tuple3<Double, Double, Integer>, Tuple3<String, Double, Integer>>) t -> Tuple3.of(String.valueOf(t.f0), t.f1, t.f2)).groupBy(2)
        .reduce((ReduceFunction<Tuple3<String, Double, Integer>>) (tuple, t1) -> Tuple3.of(tuple.f0 + "#" + t1.f0, tuple.f1, tuple.f2)).print();
env.execute();
结果:
(1.0#2.0#3.0,2.0,1)
(1.0#2.0#3.0,2.0,2)

> 2020年4月28日 下午5:24,hery168 <he...@163.com> 写道:
> 
> col1 col2 pid 
> 
> 1.0  2.0  1
> 
> 2.0  2.0  1
> 
> 1.0  2.0  1
> 
> 3.0  2.0  1
> 
> 1.0  2.0  1
> 
> 1.0  2.0  2
> 
> 1.0  2.0  2
> 
> 1.0  2.0  2
> 
> 1.0  2.0  2
> 
> 1.0  2.0  2
> 各位大神,想问一下利用flink dataset 对pid 列进行分组,然后对分组后的col1列的内容进行拼接,如1.0#2.0#1.0#3.0....
> 请问大家这个该怎么实现?