You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Jimmy Wong <wa...@163.com> on 2020/02/27 09:51:35 UTC
回复:窗口中的数据无法发送到下游
建议检查下Watermark,打印出来看看是不是合法的。btw,这代码缩紧有点尴尬。
| |
Jimmy Wong
|
|
wangzmking@163.com
|
签名由网易邮箱大师定制
在2020年02月27日 14:34,方如<16...@qq.com> 写道:
代码如下:
//将json转化为LogBean
SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean());
KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() {
@Override
public long extractAscendingTimestamp(LogBean element) {
LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
System.out.println(eventTime);
return eventTime;
}
}).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(LogBean value) throws Exception {
//获取用户id做分组
return new Tuple3<>(value.getNickname(), value.toString(), 1);
}
}).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
return value.f0;
}
});
WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
window.sum(2).print();
在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的
拜谢!代码如下:
//将json转化为LogBean
SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean());
KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() {
@Override
public long extractAscendingTimestamp(LogBean element) {
LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
System.out.println(eventTime);
return eventTime;
}
}).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(LogBean value) throws Exception {
//获取用户id做分组
return new Tuple3<>(value.getNickname(), value.toString(), 1);
}
}).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
return value.f0;
}
});
WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
window.sum(2).print();
在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的
拜谢!