You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2023/05/17 01:28:07 UTC
table api定义rowtime未生效
各位老师好,以下是我的代码:
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime()); tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" + "FROM TABLE(CUMULATE(\n" + " TABLE midTable1"+ //" TABLE "+ midTable + " , DESCRIPTOR(eventTime)\n" + " , INTERVAL '60' SECOND\n" + " , INTERVAL '1' DAY))\n" + " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|
回复:table api定义rowtime未生效
Posted by 小昌同学 <cc...@163.com>.
你好,老师,以下是midStream的代码来源;文件最后我加上了整个类的编写逻辑
|
//6、讲两条流转换为对应的表 进行关联取最小值
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//设置状态值为1天 这种设置是有问题的 因为这种空闲状态会不断的刷新,达不到一天一算的效果
//tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
// tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
// tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime().as("et"));
// Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder()
// .column("funcId", DataTypes.STRING())
// .column("serverIp", DataTypes.STRING())
// .column("outTime", DataTypes.BIGINT())
// .column("handleSerialNo", DataTypes.STRING())
// .column("info", DataTypes.STRING())
// .column("funcIdDesc", DataTypes.STRING())
// .column("eventTime", DataTypes.TIMESTAMP(3))
// .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
// .build());
Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime());
// Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream, Schema.newBuilder()
// .column("funcId", DataTypes.STRING())
// .column("serverIp", DataTypes.STRING())
// .column("outTime", DataTypes.BIGINT())
// .column("handleSerialNo", DataTypes.STRING())
// .column("info", DataTypes.STRING())
// .column("funcIdDesc", DataTypes.STRING())
// .column("eventTime", DataTypes.TIMESTAMP(3))
// .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
// .build());
Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.et as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
System.out.println("这个是resultTable" + result);
result.printSchema();
tableEnv.createTemporaryView("resultTable", result);
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class);
Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et").rowtime())
.select($("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et"));
midTable.printSchema();
tableEnv.createTemporaryView("midTable1", midTable);
//使用TVF的采用渐进式累计窗口进行计算
Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
resulTable.printSchema();
|
下面是整个的代码逻辑
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Author 昌
* @Time 2023/5/10 8:32
* 使用 flink process方法对一条流进行拆分 考虑到状态设置的问题,采用渐进式累计窗口进行计算
* 先将两条流转换后的表先简单的关联在一起,再关联的时候加上一个事件时间(row_time as cast(CURRENT_TIMESTAMP AS timestamp(3) ))
* 之后在使用渐进式窗口进行计算
*/
public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//使用侧输出流
OutputTag<BaseInfo> requestStream = new OutputTag<BaseInfo>("requestStream") {
};
OutputTag<BaseInfo> answerStream = new OutputTag<BaseInfo>("answerStream") {
};
//1、连接测试环境kafka的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
sourceStream.print("sourceStream");
//{"ip":"10.125.8.141","data":"应答: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=f7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator<BaseInfo> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo>() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");
String[] splits = datas.split("\n");
HashMap<String, String> dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});
baseInfoStream.print("baseInfoStream");
//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator<BaseInfo> tagStream = baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));
//4、根据不同的tag进行不同的输出流设定
DataStream<BaseInfo> requestDataStream = tagStream.getSideOutput(requestStream);
DataStream<BaseInfo> answerDataStream = tagStream.getSideOutput(answerStream);
requestDataStream.print("requestDataStream");
answerDataStream.print("answerDataStream");
//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
//5.1 先对请求流进行处理
SingleOutputStreamOperator<OutInfo> outRequestDataStream = requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
String actionType = value.getFuncId();
System.out.println(actionType);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;
//根据数据的action去MySQL中查找到对应的中午注释
try {
String sql = "select action_name from ActionType where action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionType);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
// return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName,DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, System.currentTimeMillis() );
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-> element.getEventTime()));;
outRequestDataStream.print("outRequestDataStream");
//5.2 对应答流进行处理
SingleOutputStreamOperator<OutInfo> outAnswerDataStream = answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
String actionType = value.getFuncId();
System.out.println(actionType);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;
//根据数据的action去MySQL中查找到对应的中午注释
try {
String sql = "select action_name from ActionType where action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionType);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
// return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, System.currentTimeMillis() );
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-> element.getEventTime()));
outAnswerDataStream.print("outAnswerDataStream");
//6、讲两条流转换为对应的表 进行关联取最小值
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//设置状态值为1天 这种设置是有问题的 因为这种空闲状态会不断的刷新,达不到一天一算的效果
//tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
// tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
// tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime().as("et"));
// Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder()
// .column("funcId", DataTypes.STRING())
// .column("serverIp", DataTypes.STRING())
// .column("outTime", DataTypes.BIGINT())
// .column("handleSerialNo", DataTypes.STRING())
// .column("info", DataTypes.STRING())
// .column("funcIdDesc", DataTypes.STRING())
// .column("eventTime", DataTypes.TIMESTAMP(3))
// .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
// .build());
Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime());
// Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream, Schema.newBuilder()
// .column("funcId", DataTypes.STRING())
// .column("serverIp", DataTypes.STRING())
// .column("outTime", DataTypes.BIGINT())
// .column("handleSerialNo", DataTypes.STRING())
// .column("info", DataTypes.STRING())
// .column("funcIdDesc", DataTypes.STRING())
// .column("eventTime", DataTypes.TIMESTAMP(3))
// .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
// .build());
Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.et as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
System.out.println("这个是resultTable" + result);
result.printSchema();
tableEnv.createTemporaryView("resultTable", result);
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class);
Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et").rowtime())
.select($("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et"));
midTable.printSchema();
tableEnv.createTemporaryView("midTable1", midTable);
//使用TVF的采用渐进式累计窗口进行计算
Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
resulTable.printSchema();
//tableEnv.executeSql("select * from "+resulTable).print();
// DataStream<Tuple2<Boolean, ResultInfo>> resultStream = tableEnv.toRetractStream(resulTable, ResultInfo.class);
// resultStream.print("resultStream");
// SingleOutputStreamOperator<ResultInfo> resultInfoStream = resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() {
// @Override
// public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws Exception {
// return value.f1;
// }
// });
// resultInfoStream.print("resultInfoStream");
// resultInfoStream.addSink(new Sink2Mysql());
env.execute();
}
}
|
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<53...@qq.com.INVALID> |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:
SingleOutputStreamOperator<Event> eventStream = env
.fromElements(
.............. ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);
我想要更多的代码尤其是midStream的部分,最好说明flink的版本
LY
531599751@qq.com
L Y
531599751@qq.com
------------------ 原始邮件 ------------------
发件人: "user-zh" <ccc0606fighting@163.com>;
发送时间: 2023年5月17日(星期三) 上午9:28
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: table api定义rowtime未生效
各位老师好,以下是我的代码:
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime()); tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" + "FROM TABLE(CUMULATE(\n" + " TABLE midTable1"+ //" TABLE "+ midTable + " , DESCRIPTOR(eventTime)\n" + " , INTERVAL '60' SECOND\n" + " , INTERVAL '1' DAY))\n" + " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|
回复:table api定义rowtime未生效
Posted by 小昌同学 <cc...@163.com>.
数据样例如下:
| {"ip":"10.125.8.111","data":"请求: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=cccf7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
{"ip":"10.125.8.139","data":"应答: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=cccf7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"} |
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<53...@qq.com.INVALID> |
| 发送日期 | 2023年5月23日 01:25 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学,感谢你的提供,我已经看完了所有代码并在本地进行了测试,问题出在这一句:
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class);
你并未在新的DataStream中插入水位线,也许你下意识觉得不需要,midStream只是使用了result作为数据源和MidInfo.class作为模板创建一个数据流,除此以外都是崭新的流,想要从DataStream中转换一个Table必须确保直接的DataStream有水位线,你可以修改成这样:
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class).assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-> element.getEventTime()));
另外我发现你似乎在每一条数据到来时都通过MapFunction去查询字典表,字典表通常修改很少,这样会导致大量的重复查询进而性能降低,建议你考虑其他方式
L Y
531599751@qq.com
L Y
531599751@qq.com
------------------ 原始邮件 ------------------
发件人: "user-zh" <ccc0606fighting@163.com>;
发送时间: 2023年5月22日(星期一) 上午9:26
收件人: "user-zh"<user-zh@flink.apache.org>;
抄送: "user-zh"<user-zh@flink.apache.org>;
主题: 回复:table api定义rowtime未生效
flink相关的版本是1.14
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<531599751@qq.com.INVALID> |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<user-zh@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:
SingleOutputStreamOperator<Event&gt; eventStream = env
.fromElements(
.............. ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);
我想要更多的代码尤其是midStream的部分,最好说明flink的版本
LY
531599751@qq.com
L&nbsp;Y
531599751@qq.com
&nbsp;
------------------&nbsp;原始邮件&nbsp;------------------
发件人: "user-zh" <ccc0606fighting@163.com&gt;;
发送时间:&nbsp;2023年5月17日(星期三) 上午9:28
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp;table api定义rowtime未生效
各位老师好,以下是我的代码:
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "FROM TABLE(CUMULATE(\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " TABLE midTable1"+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //" TABLE "+ midTable +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , DESCRIPTOR(eventTime)\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '60' SECOND\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '1' DAY))\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|
回复:table api定义rowtime未生效
Posted by 小昌同学 <cc...@163.com>.
您好,老师,感谢您的回复;我这边按照您的建议进行如下改动:
1、在表转流中增加了水位线的定义;
2、先进行查询字典表,再进行分流;
经过以上操作,我发现数据还是没有办法正常数据,我再debug的时候发现数据运行到拆分流操作,即以下代码:数据发生了卡顿现象; 我再文末附件增加了原始文件以及相关的数据示例,麻烦老师帮忙看一下是哪里的问题,如果方便的话,想加一下老师的微信,我的微信是15956076613
|
//将完整信息的流数据依据请求还是应答进行拆解
OutputTag<BaseInfo2> requestStream = new OutputTag<BaseInfo2>("requestStream") {
};
OutputTag<BaseInfo2> answerStream = new OutputTag<BaseInfo2>("answerStream") {
};
SingleOutputStreamOperator<BaseInfo2> tagStream = completeInfoStream.process(new MyProcessFunction2(requestStream, answerStream));
DataStream<BaseInfo2> requestDataStream = tagStream.getSideOutput(requestStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
DataStream<BaseInfo2> answerDataStream = tagStream.getSideOutput(answerStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
.withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
|
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<53...@qq.com.INVALID> |
| 发送日期 | 2023年5月23日 01:25 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学,感谢你的提供,我已经看完了所有代码并在本地进行了测试,问题出在这一句:
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class);
你并未在新的DataStream中插入水位线,也许你下意识觉得不需要,midStream只是使用了result作为数据源和MidInfo.class作为模板创建一个数据流,除此以外都是崭新的流,想要从DataStream中转换一个Table必须确保直接的DataStream有水位线,你可以修改成这样:
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class).assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-> element.getEventTime()));
另外我发现你似乎在每一条数据到来时都通过MapFunction去查询字典表,字典表通常修改很少,这样会导致大量的重复查询进而性能降低,建议你考虑其他方式
L Y
531599751@qq.com
L Y
531599751@qq.com
------------------ 原始邮件 ------------------
发件人: "user-zh" <ccc0606fighting@163.com>;
发送时间: 2023年5月22日(星期一) 上午9:26
收件人: "user-zh"<user-zh@flink.apache.org>;
抄送: "user-zh"<user-zh@flink.apache.org>;
主题: 回复:table api定义rowtime未生效
flink相关的版本是1.14
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<531599751@qq.com.INVALID> |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<user-zh@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:
SingleOutputStreamOperator<Event&gt; eventStream = env
.fromElements(
.............. ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);
我想要更多的代码尤其是midStream的部分,最好说明flink的版本
LY
531599751@qq.com
L&nbsp;Y
531599751@qq.com
&nbsp;
------------------&nbsp;原始邮件&nbsp;------------------
发件人: "user-zh" <ccc0606fighting@163.com&gt;;
发送时间:&nbsp;2023年5月17日(星期三) 上午9:28
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp;table api定义rowtime未生效
各位老师好,以下是我的代码:
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "FROM TABLE(CUMULATE(\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " TABLE midTable1"+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //" TABLE "+ midTable +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , DESCRIPTOR(eventTime)\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '60' SECOND\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '1' DAY))\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|
回复:table api定义rowtime未生效
Posted by L Y <53...@qq.com.INVALID>.
HI,小昌同学,感谢你的提供,我已经看完了所有代码并在本地进行了测试,问题出在这一句:
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class);
你并未在新的DataStream中插入水位线,也许你下意识觉得不需要,midStream只是使用了result作为数据源和MidInfo.class作为模板创建一个数据流,除此以外都是崭新的流,想要从DataStream中转换一个Table必须确保直接的DataStream有水位线,你可以修改成这样:
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class).assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-> element.getEventTime()));
另外我发现你似乎在每一条数据到来时都通过MapFunction去查询字典表,字典表通常修改很少,这样会导致大量的重复查询进而性能降低,建议你考虑其他方式
L Y
531599751@qq.com
L Y
531599751@qq.com
------------------ 原始邮件 ------------------
发件人: "user-zh" <ccc0606fighting@163.com>;
发送时间: 2023年5月22日(星期一) 上午9:26
收件人: "user-zh"<user-zh@flink.apache.org>;
抄送: "user-zh"<user-zh@flink.apache.org>;
主题: 回复:table api定义rowtime未生效
flink相关的版本是1.14
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<531599751@qq.com.INVALID> |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<user-zh@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:
SingleOutputStreamOperator<Event&gt; eventStream = env
.fromElements(
.............. ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);
我想要更多的代码尤其是midStream的部分,最好说明flink的版本
LY
531599751@qq.com
L&nbsp;Y
531599751@qq.com
&nbsp;
------------------&nbsp;原始邮件&nbsp;------------------
发件人: "user-zh" <ccc0606fighting@163.com&gt;;
发送时间:&nbsp;2023年5月17日(星期三) 上午9:28
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp;table api定义rowtime未生效
各位老师好,以下是我的代码:
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "FROM TABLE(CUMULATE(\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " TABLE midTable1"+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //" TABLE "+ midTable +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , DESCRIPTOR(eventTime)\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '60' SECOND\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '1' DAY))\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|
回复:table api定义rowtime未生效
Posted by 小昌同学 <cc...@163.com>.
flink相关的版本是1.14
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<53...@qq.com.INVALID> |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:
SingleOutputStreamOperator<Event> eventStream = env
.fromElements(
.............. ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);
我想要更多的代码尤其是midStream的部分,最好说明flink的版本
LY
531599751@qq.com
L Y
531599751@qq.com
------------------ 原始邮件 ------------------
发件人: "user-zh" <ccc0606fighting@163.com>;
发送时间: 2023年5月17日(星期三) 上午9:28
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: table api定义rowtime未生效
各位老师好,以下是我的代码:
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime()); tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" + "FROM TABLE(CUMULATE(\n" + " TABLE midTable1"+ //" TABLE "+ midTable + " , DESCRIPTOR(eventTime)\n" + " , INTERVAL '60' SECOND\n" + " , INTERVAL '1' DAY))\n" + " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|
回复:table api定义rowtime未生效
Posted by L Y <53...@qq.com.INVALID>.
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:
SingleOutputStreamOperator<Event> eventStream = env
.fromElements(
.............. ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);
我想要更多的代码尤其是midStream的部分,最好说明flink的版本
LY
531599751@qq.com
L Y
531599751@qq.com
------------------ 原始邮件 ------------------
发件人: "user-zh" <ccc0606fighting@163.com>;
发送时间: 2023年5月17日(星期三) 上午9:28
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: table api定义rowtime未生效
各位老师好,以下是我的代码:
| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime()); tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" + "FROM TABLE(CUMULATE(\n" + " TABLE midTable1"+ //" TABLE "+ midTable + " , DESCRIPTOR(eventTime)\n" + " , INTERVAL '60' SECOND\n" + " , INTERVAL '1' DAY))\n" + " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|