You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2023/05/25 01:30:46 UTC

flink 窗口触发计算的条件

各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
相关代码以及样例数据如下:
|
package job;
import bean.MidInfo3;
import bean.Result;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import utils.DateUtil;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly9 {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
//1、消费Kafka中的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
        String topicName = FlinkConfig.config.getProperty("dev_topicName");
        String groupId = FlinkConfig.config.getProperty("dev_groupId");
        String devMode = FlinkConfig.config.getProperty("dev_mode");
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", servers);
        prop.setProperty("group.id", groupId);
        prop.setProperty("auto.offset.reset", devMode);
        DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
        sourceStream.print("最源端的数据sourceStream");

//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
SingleOutputStreamOperator<BaseInfo2> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo2>() {
@Override
public BaseInfo2 map(String value) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

                String[] splits = datas.split("\n");
                HashMap<String, String> dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19).replace("-", "").trim();
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
                        splits[i] = splits[i].replaceFirst("=", "&");
                        String[] temp = splits[i].split("&");
if (temp.length > 1) {
                            dataMap.put(temp[0].toLowerCase(), temp[1]);
                        }
                    }
                }
return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, System.currentTimeMillis());
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
        baseInfoStream.print("不加功能描述的 baseInfoStream");

//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
SingleOutputStreamOperator<BaseInfo2> completeInfoStream = baseInfoStream.map(new MapFunction<BaseInfo2, BaseInfo2>() {
@Override
public BaseInfo2 map(BaseInfo2 value) throws Exception {
//拿到数据中携带的数字的action
String actionId = value.getFuncId();
                System.out.println("数据中的action编码是: " + actionId);
                String actionName = null;
                Connection connection = null;
                PreparedStatement ps = null;

//根据数据的action去MySQL中查找到对应的中午注释
try {
                    String sql = "select action_name from ActionType where action = ?";
                    connection = JdbcUtil.getConnection();
                    ps = connection.prepareStatement(sql);
                    ps.setString(1, actionId);
                    ResultSet resultSet = ps.executeQuery();
                    System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
                        actionName = resultSet.getString("action_name");
                    }
                } catch (Exception e) {
throw new RuntimeException(e);
                } finally {
                    JdbcUtil.closeResource(connection, ps);
                }
return new BaseInfo2(value.getFuncId(), actionName, value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), value.getEvenTime());
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
        completeInfoStream.print("加上中文描述的 completeInfoStream");

        SingleOutputStreamOperator<BaseInfo2> requestDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
@Override
public boolean filter(BaseInfo2 baseInfo2) throws Exception {
return baseInfo2.getInfo().contains("请求");
            }
        });
        SingleOutputStreamOperator<BaseInfo2> answerDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
@Override
public boolean filter(BaseInfo2 baseInfo2) throws Exception {
return baseInfo2.getInfo().contains("应答");
            }
        });
        requestDataStream.print("请求流是 requestDataStream");
        answerDataStream.print("应答流是 answerDataStream");

        DataStream<MidInfo3> joinStream = requestDataStream.join(answerDataStream)
                .where(BaseInfo2::getHandleSerialNo)
                .equalTo(BaseInfo2::getHandleSerialNo)
                .window(TumblingEventTimeWindows.of(Time.minutes(1L)))
                .apply(new JoinFunction<BaseInfo2, BaseInfo2, MidInfo3>() {
@Override
public MidInfo3 join(BaseInfo2 first, BaseInfo2 second) throws Exception {
                        System.out.println("以关联:" + first.getFuncId() + second.getEvenTime());
                        System.out.println("关联:" + first.getEvenTime() +"|" +second.getEvenTime()+"执行时间:"+System.currentTimeMillis());
return new MidInfo3(first.getFuncId(), first.getFuncIdDesc(), first.getServerIp(), first.getBaseTime(), second.getBaseTime(), first.getFuncId() + first.getServerIp(), first.getEvenTime());
                    }
                });
        joinStream.print("joinStream:");
        System.out.println("joinTime:"+System.currentTimeMillis());
        joinStream.keyBy(new KeySelector<MidInfo3, String>() {
@Override
public String getKey(MidInfo3 value) throws Exception {
return value.getFuncId() + value.getServerIp();
            }
        }).process(new ProcessFunction<MidInfo3, Result>() {
private ValueState<Long> timeState;

@Override
public void open(Configuration parameters) throws Exception {
                System.out.println("加载的是process中的open方法"+System.currentTimeMillis());
                ValueStateDescriptor<Long> timeStateDescriptor = new ValueStateDescriptor<>("timeState", Long.class);
// 过期状态清除
StateTtlConfig stateTtlConfig = StateTtlConfig
                        .newBuilder(org.apache.flink.api.common.time.Time.days(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        .build();
// 开启ttl
timeStateDescriptor.enableTimeToLive(stateTtlConfig);

this.timeState = getRuntimeContext().getState(timeStateDescriptor);

            }

@Override
public void processElement(MidInfo3 value, ProcessFunction<MidInfo3, Result>.Context ctx, Collector<Result> out) throws Exception {
//获取到当前的状态值
if (null==timeState.value()){
timeState.update(value.getAnswerTime()-value.getRequesTime());
                }else {
if ((value.getAnswerTime() - value.getRequesTime()) < timeState.value()) {
timeState.update((value.getAnswerTime() - value.getRequesTime()));
                    }
                }

                out.collect(new Result(value.getFuncId(), value.getFuncIdDesc(), value.getServerIp(), timeState.value()));

            }
        }).print("结果是");

        env.execute();
    }
}



相关的数据样例如下;
{"ip":"10.125.8.20230525_0856","data":"请求: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
{"ip":"10.125.8.20230525_0856","data":"应答: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
|

回复: flink 窗口触发计算的条件

Posted by 小昌同学 <cc...@163.com>.
请教一下老师,您说的【同样数据的话,水印没有推进,窗口就不会触发】是不是意思是发送相同的数据,数据本身携带的时间戳是一样的,达不到水位线触发窗口的标准呀?
还有两个问题想请教一下各位老师:
1、事件时间窗口的闭合是取决于下一条数据所携带的时间戳嘛,只有当下一条数据携带的时间戳大于上一个窗口的endTime,窗口才会触发,如果是这个样子的话,那如果一个最后一个窗口怎么触发啊
2、我想使用stream api去打印出来窗口的起始时间以及结束时间,这个是哪一个api呀


| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | lxk<lx...@163.com> |
| 发送日期 | 2023年5月25日 10:14 |
| 收件人 | <us...@flink.apache.org> |
| 主题 | Re:回复: flink 窗口触发计算的条件 |
你好,可以先看看官方文档中关于事件时间和水印的介绍
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/time/
如果你发了多条数据,但是都是同样数据的话,水印没有推进,窗口就不会触发



















在 2023-05-25 10:00:36,"小昌同学" <cc...@163.com> 写道:
是的 我发送了很多数据,发现窗口还是没有触发


| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | yidan zhao<hi...@gmail.com> |
| 发送日期 | 2023年5月25日 09:59 |
| 收件人 | <us...@flink.apache.org> |
| 主题 | Re: flink 窗口触发计算的条件 |
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。

小昌同学 <cc...@163.com> 于2023年5月25日周四 09:32写道:

各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
相关代码以及样例数据如下:
|
package job;
import bean.MidInfo3;
import bean.Result;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import utils.DateUtil;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly9 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
//1、消费Kafka中的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
sourceStream.print("最源端的数据sourceStream");

//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
SingleOutputStreamOperator<BaseInfo2> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo2>() {
@Override
public BaseInfo2 map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap<String, String> dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19).replace("-", "").trim();
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, System.currentTimeMillis());
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
baseInfoStream.print("不加功能描述的 baseInfoStream");

//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
SingleOutputStreamOperator<BaseInfo2> completeInfoStream = baseInfoStream.map(new MapFunction<BaseInfo2, BaseInfo2>() {
@Override
public BaseInfo2 map(BaseInfo2 value) throws Exception {
//拿到数据中携带的数字的action
String actionId = value.getFuncId();
System.out.println("数据中的action编码是: " + actionId);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;

//根据数据的action去MySQL中查找到对应的中午注释
try {
String sql = "select action_name from ActionType where action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionId);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
return new BaseInfo2(value.getFuncId(), actionName, value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), value.getEvenTime());
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
completeInfoStream.print("加上中文描述的 completeInfoStream");

SingleOutputStreamOperator<BaseInfo2> requestDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
@Override
public boolean filter(BaseInfo2 baseInfo2) throws Exception {
return baseInfo2.getInfo().contains("请求");
}
});
SingleOutputStreamOperator<BaseInfo2> answerDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
@Override
public boolean filter(BaseInfo2 baseInfo2) throws Exception {
return baseInfo2.getInfo().contains("应答");
}
});
requestDataStream.print("请求流是 requestDataStream");
answerDataStream.print("应答流是 answerDataStream");

DataStream<MidInfo3> joinStream = requestDataStream.join(answerDataStream)
.where(BaseInfo2::getHandleSerialNo)
.equalTo(BaseInfo2::getHandleSerialNo)
.window(TumblingEventTimeWindows.of(Time.minutes(1L)))
.apply(new JoinFunction<BaseInfo2, BaseInfo2, MidInfo3>() {
@Override
public MidInfo3 join(BaseInfo2 first, BaseInfo2 second) throws Exception {
System.out.println("以关联:" + first.getFuncId() + second.getEvenTime());
System.out.println("关联:" + first.getEvenTime() +"|" +second.getEvenTime()+"执行时间:"+System.currentTimeMillis());
return new MidInfo3(first.getFuncId(), first.getFuncIdDesc(), first.getServerIp(), first.getBaseTime(), second.getBaseTime(), first.getFuncId() + first.getServerIp(), first.getEvenTime());
}
});
joinStream.print("joinStream:");
System.out.println("joinTime:"+System.currentTimeMillis());
joinStream.keyBy(new KeySelector<MidInfo3, String>() {
@Override
public String getKey(MidInfo3 value) throws Exception {
return value.getFuncId() + value.getServerIp();
}
}).process(new ProcessFunction<MidInfo3, Result>() {
private ValueState<Long> timeState;

@Override
public void open(Configuration parameters) throws Exception {
System.out.println("加载的是process中的open方法"+System.currentTimeMillis());
ValueStateDescriptor<Long> timeStateDescriptor = new ValueStateDescriptor<>("timeState", Long.class);
// 过期状态清除
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
// 开启ttl
timeStateDescriptor.enableTimeToLive(stateTtlConfig);

this.timeState = getRuntimeContext().getState(timeStateDescriptor);

}

@Override
public void processElement(MidInfo3 value, ProcessFunction<MidInfo3, Result>.Context ctx, Collector<Result> out) throws Exception {
//获取到当前的状态值
if (null==timeState.value()){
timeState.update(value.getAnswerTime()-value.getRequesTime());
}else {
if ((value.getAnswerTime() - value.getRequesTime()) < timeState.value()) {
timeState.update((value.getAnswerTime() - value.getRequesTime()));
}
}

out.collect(new Result(value.getFuncId(), value.getFuncIdDesc(), value.getServerIp(), timeState.value()));

}
}).print("结果是");

env.execute();
}
}



相关的数据样例如下;
{"ip":"10.125.8.20230525_0856","data":"请求: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
{"ip":"10.125.8.20230525_0856","data":"应答: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
|

Re:回复: flink 窗口触发计算的条件

Posted by lxk <lx...@163.com>.
你好,可以先看看官方文档中关于事件时间和水印的介绍
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/time/
如果你发了多条数据,但是都是同样数据的话,水印没有推进,窗口就不会触发



















在 2023-05-25 10:00:36,"小昌同学" <cc...@163.com> 写道:
>是的 我发送了很多数据,发现窗口还是没有触发
>
>
>| |
>小昌同学
>|
>|
>ccc0606fighting@163.com
>|
>---- 回复的原邮件 ----
>| 发件人 | yidan zhao<hi...@gmail.com> |
>| 发送日期 | 2023年5月25日 09:59 |
>| 收件人 | <us...@flink.apache.org> |
>| 主题 | Re: flink 窗口触发计算的条件 |
>如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。
>
>小昌同学 <cc...@163.com> 于2023年5月25日周四 09:32写道:
>
>各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
>我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
>但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
>相关代码以及样例数据如下:
>|
>package job;
>import bean.MidInfo3;
>import bean.Result;
>import bean2.BaseInfo2;
>import com.alibaba.fastjson.JSON;
>import com.alibaba.fastjson.JSONObject;
>import config.FlinkConfig;
>import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>import org.apache.flink.api.common.functions.FilterFunction;
>import org.apache.flink.api.common.functions.JoinFunction;
>import org.apache.flink.api.common.functions.MapFunction;
>import org.apache.flink.api.common.serialization.SimpleStringSchema;
>import org.apache.flink.api.common.state.StateTtlConfig;
>import org.apache.flink.api.common.state.ValueState;
>import org.apache.flink.api.common.state.ValueStateDescriptor;
>import org.apache.flink.api.java.functions.KeySelector;
>import org.apache.flink.configuration.Configuration;
>import org.apache.flink.streaming.api.datastream.ConnectedStreams;
>import org.apache.flink.streaming.api.datastream.DataStream;
>import org.apache.flink.streaming.api.datastream.DataStreamSource;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.streaming.api.functions.ProcessFunction;
>import org.apache.flink.streaming.api.functions.co.CoMapFunction;
>import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
>import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>import org.apache.flink.streaming.api.windowing.time.Time;
>import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
>import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>import org.apache.flink.util.Collector;
>import utils.DateUtil;
>import utils.JdbcUtil;
>
>import java.sql.Connection;
>import java.sql.PreparedStatement;
>import java.sql.ResultSet;
>import java.time.Duration;
>import java.util.HashMap;
>import java.util.Properties;
>
>public class RytLogAnly9 {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>env.disableOperatorChaining();
>//1、消费Kafka中的数据
>String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
>String topicName = FlinkConfig.config.getProperty("dev_topicName");
>String groupId = FlinkConfig.config.getProperty("dev_groupId");
>String devMode = FlinkConfig.config.getProperty("dev_mode");
>Properties prop = new Properties();
>prop.setProperty("bootstrap.servers", servers);
>prop.setProperty("group.id", groupId);
>prop.setProperty("auto.offset.reset", devMode);
>DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
>sourceStream.print("最源端的数据sourceStream");
>
>//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
>SingleOutputStreamOperator<BaseInfo2> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo2>() {
>@Override
>public BaseInfo2 map(String value) throws Exception {
>JSONObject jsonObject = JSON.parseObject(value);
>//获取到不同的服务器IP
>String serverIp = jsonObject.getString("ip");
>//获取到不同的data的数据
>String datas = jsonObject.getString("data");
>
>String[] splits = datas.split("\n");
>HashMap<String, String> dataMap = new HashMap<>();
>//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
>String time = splits[0].substring(7, 19).replace("-", "").trim();
>//将subData填充到自定义类型中,用来判断时请求还是应答
>String subData = datas.substring(0, 10);
>for (int i = 0; i < splits.length; i++) {
>if (splits[i].contains("=")) {
>splits[i] = splits[i].replaceFirst("=", "&");
>String[] temp = splits[i].split("&");
>if (temp.length > 1) {
>dataMap.put(temp[0].toLowerCase(), temp[1]);
>}
>}
>}
>return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, System.currentTimeMillis());
>}
>}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
>baseInfoStream.print("不加功能描述的 baseInfoStream");
>
>//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
>SingleOutputStreamOperator<BaseInfo2> completeInfoStream = baseInfoStream.map(new MapFunction<BaseInfo2, BaseInfo2>() {
>@Override
>public BaseInfo2 map(BaseInfo2 value) throws Exception {
>//拿到数据中携带的数字的action
>String actionId = value.getFuncId();
>System.out.println("数据中的action编码是: " + actionId);
>String actionName = null;
>Connection connection = null;
>PreparedStatement ps = null;
>
>//根据数据的action去MySQL中查找到对应的中午注释
>try {
>String sql = "select action_name from ActionType where action = ?";
>connection = JdbcUtil.getConnection();
>ps = connection.prepareStatement(sql);
>ps.setString(1, actionId);
>ResultSet resultSet = ps.executeQuery();
>System.out.println("resultSet是" + resultSet);
>if (resultSet.next()) {
>actionName = resultSet.getString("action_name");
>}
>} catch (Exception e) {
>throw new RuntimeException(e);
>} finally {
>JdbcUtil.closeResource(connection, ps);
>}
>return new BaseInfo2(value.getFuncId(), actionName, value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), value.getEvenTime());
>}
>}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
>completeInfoStream.print("加上中文描述的 completeInfoStream");
>
>SingleOutputStreamOperator<BaseInfo2> requestDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
>@Override
>public boolean filter(BaseInfo2 baseInfo2) throws Exception {
>return baseInfo2.getInfo().contains("请求");
>}
>});
>SingleOutputStreamOperator<BaseInfo2> answerDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
>@Override
>public boolean filter(BaseInfo2 baseInfo2) throws Exception {
>return baseInfo2.getInfo().contains("应答");
>}
>});
>requestDataStream.print("请求流是 requestDataStream");
>answerDataStream.print("应答流是 answerDataStream");
>
>DataStream<MidInfo3> joinStream = requestDataStream.join(answerDataStream)
>.where(BaseInfo2::getHandleSerialNo)
>.equalTo(BaseInfo2::getHandleSerialNo)
>.window(TumblingEventTimeWindows.of(Time.minutes(1L)))
>.apply(new JoinFunction<BaseInfo2, BaseInfo2, MidInfo3>() {
>@Override
>public MidInfo3 join(BaseInfo2 first, BaseInfo2 second) throws Exception {
>System.out.println("以关联:" + first.getFuncId() + second.getEvenTime());
>System.out.println("关联:" + first.getEvenTime() +"|" +second.getEvenTime()+"执行时间:"+System.currentTimeMillis());
>return new MidInfo3(first.getFuncId(), first.getFuncIdDesc(), first.getServerIp(), first.getBaseTime(), second.getBaseTime(), first.getFuncId() + first.getServerIp(), first.getEvenTime());
>}
>});
>joinStream.print("joinStream:");
>System.out.println("joinTime:"+System.currentTimeMillis());
>joinStream.keyBy(new KeySelector<MidInfo3, String>() {
>@Override
>public String getKey(MidInfo3 value) throws Exception {
>return value.getFuncId() + value.getServerIp();
>}
>}).process(new ProcessFunction<MidInfo3, Result>() {
>private ValueState<Long> timeState;
>
>@Override
>public void open(Configuration parameters) throws Exception {
>System.out.println("加载的是process中的open方法"+System.currentTimeMillis());
>ValueStateDescriptor<Long> timeStateDescriptor = new ValueStateDescriptor<>("timeState", Long.class);
>// 过期状态清除
>StateTtlConfig stateTtlConfig = StateTtlConfig
>.newBuilder(org.apache.flink.api.common.time.Time.days(1))
>.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>.build();
>// 开启ttl
>timeStateDescriptor.enableTimeToLive(stateTtlConfig);
>
>this.timeState = getRuntimeContext().getState(timeStateDescriptor);
>
>}
>
>@Override
>public void processElement(MidInfo3 value, ProcessFunction<MidInfo3, Result>.Context ctx, Collector<Result> out) throws Exception {
>//获取到当前的状态值
>if (null==timeState.value()){
>timeState.update(value.getAnswerTime()-value.getRequesTime());
>}else {
>if ((value.getAnswerTime() - value.getRequesTime()) < timeState.value()) {
>timeState.update((value.getAnswerTime() - value.getRequesTime()));
>}
>}
>
>out.collect(new Result(value.getFuncId(), value.getFuncIdDesc(), value.getServerIp(), timeState.value()));
>
>}
>}).print("结果是");
>
>env.execute();
>}
>}
>
>
>
>相关的数据样例如下;
>{"ip":"10.125.8.20230525_0856","data":"请求: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
>{"ip":"10.125.8.20230525_0856","data":"应答: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
>|

回复: flink 窗口触发计算的条件

Posted by 小昌同学 <cc...@163.com>.
是的 我发送了很多数据,发现窗口还是没有触发


| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | yidan zhao<hi...@gmail.com> |
| 发送日期 | 2023年5月25日 09:59 |
| 收件人 | <us...@flink.apache.org> |
| 主题 | Re: flink 窗口触发计算的条件 |
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。

小昌同学 <cc...@163.com> 于2023年5月25日周四 09:32写道:

各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
相关代码以及样例数据如下:
|
package job;
import bean.MidInfo3;
import bean.Result;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import utils.DateUtil;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly9 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
//1、消费Kafka中的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
sourceStream.print("最源端的数据sourceStream");

//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
SingleOutputStreamOperator<BaseInfo2> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo2>() {
@Override
public BaseInfo2 map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap<String, String> dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19).replace("-", "").trim();
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, System.currentTimeMillis());
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
baseInfoStream.print("不加功能描述的 baseInfoStream");

//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
SingleOutputStreamOperator<BaseInfo2> completeInfoStream = baseInfoStream.map(new MapFunction<BaseInfo2, BaseInfo2>() {
@Override
public BaseInfo2 map(BaseInfo2 value) throws Exception {
//拿到数据中携带的数字的action
String actionId = value.getFuncId();
System.out.println("数据中的action编码是: " + actionId);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;

//根据数据的action去MySQL中查找到对应的中午注释
try {
String sql = "select action_name from ActionType where action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionId);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
return new BaseInfo2(value.getFuncId(), actionName, value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), value.getEvenTime());
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
completeInfoStream.print("加上中文描述的 completeInfoStream");

SingleOutputStreamOperator<BaseInfo2> requestDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
@Override
public boolean filter(BaseInfo2 baseInfo2) throws Exception {
return baseInfo2.getInfo().contains("请求");
}
});
SingleOutputStreamOperator<BaseInfo2> answerDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
@Override
public boolean filter(BaseInfo2 baseInfo2) throws Exception {
return baseInfo2.getInfo().contains("应答");
}
});
requestDataStream.print("请求流是 requestDataStream");
answerDataStream.print("应答流是 answerDataStream");

DataStream<MidInfo3> joinStream = requestDataStream.join(answerDataStream)
.where(BaseInfo2::getHandleSerialNo)
.equalTo(BaseInfo2::getHandleSerialNo)
.window(TumblingEventTimeWindows.of(Time.minutes(1L)))
.apply(new JoinFunction<BaseInfo2, BaseInfo2, MidInfo3>() {
@Override
public MidInfo3 join(BaseInfo2 first, BaseInfo2 second) throws Exception {
System.out.println("以关联:" + first.getFuncId() + second.getEvenTime());
System.out.println("关联:" + first.getEvenTime() +"|" +second.getEvenTime()+"执行时间:"+System.currentTimeMillis());
return new MidInfo3(first.getFuncId(), first.getFuncIdDesc(), first.getServerIp(), first.getBaseTime(), second.getBaseTime(), first.getFuncId() + first.getServerIp(), first.getEvenTime());
}
});
joinStream.print("joinStream:");
System.out.println("joinTime:"+System.currentTimeMillis());
joinStream.keyBy(new KeySelector<MidInfo3, String>() {
@Override
public String getKey(MidInfo3 value) throws Exception {
return value.getFuncId() + value.getServerIp();
}
}).process(new ProcessFunction<MidInfo3, Result>() {
private ValueState<Long> timeState;

@Override
public void open(Configuration parameters) throws Exception {
System.out.println("加载的是process中的open方法"+System.currentTimeMillis());
ValueStateDescriptor<Long> timeStateDescriptor = new ValueStateDescriptor<>("timeState", Long.class);
// 过期状态清除
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
// 开启ttl
timeStateDescriptor.enableTimeToLive(stateTtlConfig);

this.timeState = getRuntimeContext().getState(timeStateDescriptor);

}

@Override
public void processElement(MidInfo3 value, ProcessFunction<MidInfo3, Result>.Context ctx, Collector<Result> out) throws Exception {
//获取到当前的状态值
if (null==timeState.value()){
timeState.update(value.getAnswerTime()-value.getRequesTime());
}else {
if ((value.getAnswerTime() - value.getRequesTime()) < timeState.value()) {
timeState.update((value.getAnswerTime() - value.getRequesTime()));
}
}

out.collect(new Result(value.getFuncId(), value.getFuncIdDesc(), value.getServerIp(), timeState.value()));

}
}).print("结果是");

env.execute();
}
}



相关的数据样例如下;
{"ip":"10.125.8.20230525_0856","data":"请求: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
{"ip":"10.125.8.20230525_0856","data":"应答: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
|

Re: flink 窗口触发计算的条件

Posted by yidan zhao <hi...@gmail.com>.
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。

小昌同学 <cc...@163.com> 于2023年5月25日周四 09:32写道:
>
> 各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
> 我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
> 但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
> 相关代码以及样例数据如下:
> |
> package job;
> import bean.MidInfo3;
> import bean.Result;
> import bean2.BaseInfo2;
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONObject;
> import config.FlinkConfig;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.JoinFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.ConnectedStreams;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.api.functions.co.CoMapFunction;
> import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.util.Collector;
> import utils.DateUtil;
> import utils.JdbcUtil;
>
> import java.sql.Connection;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import java.time.Duration;
> import java.util.HashMap;
> import java.util.Properties;
>
> public class RytLogAnly9 {
> public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.disableOperatorChaining();
> //1、消费Kafka中的数据
> String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
>         String topicName = FlinkConfig.config.getProperty("dev_topicName");
>         String groupId = FlinkConfig.config.getProperty("dev_groupId");
>         String devMode = FlinkConfig.config.getProperty("dev_mode");
>         Properties prop = new Properties();
>         prop.setProperty("bootstrap.servers", servers);
>         prop.setProperty("group.id", groupId);
>         prop.setProperty("auto.offset.reset", devMode);
>         DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
>         sourceStream.print("最源端的数据sourceStream");
>
> //2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
> SingleOutputStreamOperator<BaseInfo2> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo2>() {
> @Override
> public BaseInfo2 map(String value) throws Exception {
>                 JSONObject jsonObject = JSON.parseObject(value);
> //获取到不同的服务器IP
> String serverIp = jsonObject.getString("ip");
> //获取到不同的data的数据
> String datas = jsonObject.getString("data");
>
>                 String[] splits = datas.split("\n");
>                 HashMap<String, String> dataMap = new HashMap<>();
> //将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
> String time = splits[0].substring(7, 19).replace("-", "").trim();
> //将subData填充到自定义类型中,用来判断时请求还是应答
> String subData = datas.substring(0, 10);
> for (int i = 0; i < splits.length; i++) {
> if (splits[i].contains("=")) {
>                         splits[i] = splits[i].replaceFirst("=", "&");
>                         String[] temp = splits[i].split("&");
> if (temp.length > 1) {
>                             dataMap.put(temp[0].toLowerCase(), temp[1]);
>                         }
>                     }
>                 }
> return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, System.currentTimeMillis());
>             }
>         }).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
>         baseInfoStream.print("不加功能描述的 baseInfoStream");
>
> //3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
> SingleOutputStreamOperator<BaseInfo2> completeInfoStream = baseInfoStream.map(new MapFunction<BaseInfo2, BaseInfo2>() {
> @Override
> public BaseInfo2 map(BaseInfo2 value) throws Exception {
> //拿到数据中携带的数字的action
> String actionId = value.getFuncId();
>                 System.out.println("数据中的action编码是: " + actionId);
>                 String actionName = null;
>                 Connection connection = null;
>                 PreparedStatement ps = null;
>
> //根据数据的action去MySQL中查找到对应的中午注释
> try {
>                     String sql = "select action_name from ActionType where action = ?";
>                     connection = JdbcUtil.getConnection();
>                     ps = connection.prepareStatement(sql);
>                     ps.setString(1, actionId);
>                     ResultSet resultSet = ps.executeQuery();
>                     System.out.println("resultSet是" + resultSet);
> if (resultSet.next()) {
>                         actionName = resultSet.getString("action_name");
>                     }
>                 } catch (Exception e) {
> throw new RuntimeException(e);
>                 } finally {
>                     JdbcUtil.closeResource(connection, ps);
>                 }
> return new BaseInfo2(value.getFuncId(), actionName, value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), value.getEvenTime());
>             }
>         }).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
>         completeInfoStream.print("加上中文描述的 completeInfoStream");
>
>         SingleOutputStreamOperator<BaseInfo2> requestDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
> @Override
> public boolean filter(BaseInfo2 baseInfo2) throws Exception {
> return baseInfo2.getInfo().contains("请求");
>             }
>         });
>         SingleOutputStreamOperator<BaseInfo2> answerDataStream = completeInfoStream.filter(new FilterFunction<BaseInfo2>() {
> @Override
> public boolean filter(BaseInfo2 baseInfo2) throws Exception {
> return baseInfo2.getInfo().contains("应答");
>             }
>         });
>         requestDataStream.print("请求流是 requestDataStream");
>         answerDataStream.print("应答流是 answerDataStream");
>
>         DataStream<MidInfo3> joinStream = requestDataStream.join(answerDataStream)
>                 .where(BaseInfo2::getHandleSerialNo)
>                 .equalTo(BaseInfo2::getHandleSerialNo)
>                 .window(TumblingEventTimeWindows.of(Time.minutes(1L)))
>                 .apply(new JoinFunction<BaseInfo2, BaseInfo2, MidInfo3>() {
> @Override
> public MidInfo3 join(BaseInfo2 first, BaseInfo2 second) throws Exception {
>                         System.out.println("以关联:" + first.getFuncId() + second.getEvenTime());
>                         System.out.println("关联:" + first.getEvenTime() +"|" +second.getEvenTime()+"执行时间:"+System.currentTimeMillis());
> return new MidInfo3(first.getFuncId(), first.getFuncIdDesc(), first.getServerIp(), first.getBaseTime(), second.getBaseTime(), first.getFuncId() + first.getServerIp(), first.getEvenTime());
>                     }
>                 });
>         joinStream.print("joinStream:");
>         System.out.println("joinTime:"+System.currentTimeMillis());
>         joinStream.keyBy(new KeySelector<MidInfo3, String>() {
> @Override
> public String getKey(MidInfo3 value) throws Exception {
> return value.getFuncId() + value.getServerIp();
>             }
>         }).process(new ProcessFunction<MidInfo3, Result>() {
> private ValueState<Long> timeState;
>
> @Override
> public void open(Configuration parameters) throws Exception {
>                 System.out.println("加载的是process中的open方法"+System.currentTimeMillis());
>                 ValueStateDescriptor<Long> timeStateDescriptor = new ValueStateDescriptor<>("timeState", Long.class);
> // 过期状态清除
> StateTtlConfig stateTtlConfig = StateTtlConfig
>                         .newBuilder(org.apache.flink.api.common.time.Time.days(1))
>                         .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>                         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                         .build();
> // 开启ttl
> timeStateDescriptor.enableTimeToLive(stateTtlConfig);
>
> this.timeState = getRuntimeContext().getState(timeStateDescriptor);
>
>             }
>
> @Override
> public void processElement(MidInfo3 value, ProcessFunction<MidInfo3, Result>.Context ctx, Collector<Result> out) throws Exception {
> //获取到当前的状态值
> if (null==timeState.value()){
> timeState.update(value.getAnswerTime()-value.getRequesTime());
>                 }else {
> if ((value.getAnswerTime() - value.getRequesTime()) < timeState.value()) {
> timeState.update((value.getAnswerTime() - value.getRequesTime()));
>                     }
>                 }
>
>                 out.collect(new Result(value.getFuncId(), value.getFuncIdDesc(), value.getServerIp(), timeState.value()));
>
>             }
>         }).print("结果是");
>
>         env.execute();
>     }
> }
>
>
>
> 相关的数据样例如下;
> {"ip":"10.125.8.20230525_0856","data":"请求: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
> {"ip":"10.125.8.20230525_0856","data":"应答: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
> |