You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 方如 <16...@qq.com> on 2020/02/27 06:34:55 UTC

窗口中的数据无法发送到下游

代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!

回复:窗口中的数据无法发送到下游

Posted by Jimmy Wong <wa...@163.com>.
建议检查下Watermark,打印出来看看是不是合法的。btw,这代码缩紧有点尴尬。


| |
Jimmy Wong
|
|
wangzmking@163.com
|
签名由网易邮箱大师定制


在2020年02月27日 14:34,方如<16...@qq.com> 写道:
代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!