You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2023/05/17 01:28:07 UTC

table api定义rowtime未生效

各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());        tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +                "FROM TABLE(CUMULATE(\n" +                " TABLE midTable1"+                //" TABLE "+ midTable +                " , DESCRIPTOR(eventTime)\n" +                " , INTERVAL '60' SECOND\n" +                " , INTERVAL '1' DAY))\n" +                " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|

回复:table api定义rowtime未生效

Posted by 小昌同学 <cc...@163.com>.
你好,老师,以下是midStream的代码来源;文件最后我加上了整个类的编写逻辑
|
//6、讲两条流转换为对应的表 进行关联取最小值
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//设置状态值为1天 这种设置是有问题的 因为这种空闲状态会不断的刷新,达不到一天一算的效果
//tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
//        tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
//        tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime().as("et"));
//        Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder()
//                .column("funcId", DataTypes.STRING())
//                .column("serverIp", DataTypes.STRING())
//                .column("outTime", DataTypes.BIGINT())
//                .column("handleSerialNo", DataTypes.STRING())
//                .column("info", DataTypes.STRING())
//                .column("funcIdDesc", DataTypes.STRING())
//                .column("eventTime", DataTypes.TIMESTAMP(3))
//                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//                .build());
Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime());
//        Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream, Schema.newBuilder()
//                .column("funcId", DataTypes.STRING())
//                .column("serverIp", DataTypes.STRING())
//                .column("outTime", DataTypes.BIGINT())
//                .column("handleSerialNo", DataTypes.STRING())
//                .column("info", DataTypes.STRING())
//                .column("funcIdDesc", DataTypes.STRING())
//                .column("eventTime", DataTypes.TIMESTAMP(3))
//                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//                .build());


Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.et  as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
        System.out.println("这个是resultTable" + result);
        result.printSchema();
        tableEnv.createTemporaryView("resultTable", result);

        DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class);
        Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et").rowtime())
                .select($("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et"));
        midTable.printSchema();
        tableEnv.createTemporaryView("midTable1", midTable);

//使用TVF的采用渐进式累计窗口进行计算
Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
        resulTable.printSchema();
|


下面是整个的代码逻辑
|
package job;

import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author 昌
* @Time 2023/5/10 8:32
 * 使用 flink process方法对一条流进行拆分 考虑到状态设置的问题,采用渐进式累计窗口进行计算
* 先将两条流转换后的表先简单的关联在一起,再关联的时候加上一个事件时间(row_time as cast(CURRENT_TIMESTAMP  AS timestamp(3) ))
* 之后在使用渐进式窗口进行计算
*/
public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


//使用侧输出流
OutputTag<BaseInfo> requestStream = new OutputTag<BaseInfo>("requestStream") {
        };
        OutputTag<BaseInfo> answerStream = new OutputTag<BaseInfo>("answerStream") {
        };

//1、连接测试环境kafka的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
        String topicName = FlinkConfig.config.getProperty("dev_topicName");
        String groupId = FlinkConfig.config.getProperty("dev_groupId");
        String devMode = FlinkConfig.config.getProperty("dev_mode");
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", servers);
        prop.setProperty("group.id", groupId);
        prop.setProperty("auto.offset.reset", devMode);
        DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
        sourceStream.print("sourceStream");
//{"ip":"10.125.8.141","data":"应答: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=f7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}
        //2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator<BaseInfo> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo>() {
@Override
public BaseInfo map(String value) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

                String[] splits = datas.split("\n");
                HashMap<String, String> dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
                        splits[i] = splits[i].replaceFirst("=", "&");
                        String[] temp = splits[i].split("&");
if (temp.length > 1) {
                            dataMap.put(temp[0].toLowerCase(), temp[1]);
                        }
                    }
                }
return new BaseInfo(dataMap.get("action"), serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
            }
        });
        baseInfoStream.print("baseInfoStream");
//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator<BaseInfo> tagStream = baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));

//4、根据不同的tag进行不同的输出流设定
DataStream<BaseInfo> requestDataStream = tagStream.getSideOutput(requestStream);
        DataStream<BaseInfo> answerDataStream = tagStream.getSideOutput(answerStream);

        requestDataStream.print("requestDataStream");
        answerDataStream.print("answerDataStream");

//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
//5.1 先对请求流进行处理
SingleOutputStreamOperator<OutInfo> outRequestDataStream = requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
String actionType = value.getFuncId();
                System.out.println(actionType);
                String actionName = null;
                Connection connection = null;
                PreparedStatement ps = null;

//根据数据的action去MySQL中查找到对应的中午注释
try {
                    String sql = "select action_name from ActionType where action = ?";
                    connection = JdbcUtil.getConnection();
                    ps = connection.prepareStatement(sql);
                    ps.setString(1, actionType);
                    ResultSet resultSet = ps.executeQuery();
                    System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
                        actionName = resultSet.getString("action_name");
                    }
                } catch (Exception e) {
throw new RuntimeException(e);
                } finally {
                    JdbcUtil.closeResource(connection, ps);
                }
//                return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName,DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, System.currentTimeMillis() );
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-> element.getEventTime()));;
        outRequestDataStream.print("outRequestDataStream");


//5.2 对应答流进行处理
SingleOutputStreamOperator<OutInfo> outAnswerDataStream = answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
String actionType = value.getFuncId();
                System.out.println(actionType);
                String actionName = null;
                Connection connection = null;
                PreparedStatement ps = null;

//根据数据的action去MySQL中查找到对应的中午注释
try {
                    String sql = "select action_name from ActionType where action = ?";
                    connection = JdbcUtil.getConnection();
                    ps = connection.prepareStatement(sql);
                    ps.setString(1, actionType);
                    ResultSet resultSet = ps.executeQuery();
                    System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
                        actionName = resultSet.getString("action_name");
                    }
                } catch (Exception e) {
throw new RuntimeException(e);
                } finally {
                    JdbcUtil.closeResource(connection, ps);
                }
//                return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, System.currentTimeMillis() );
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<OutInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-> element.getEventTime()));
        outAnswerDataStream.print("outAnswerDataStream");


//6、讲两条流转换为对应的表 进行关联取最小值
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//设置状态值为1天 这种设置是有问题的 因为这种空闲状态会不断的刷新,达不到一天一算的效果
//tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
//        tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
//        tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest =tableEnv.fromDataStream(outRequestDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime().as("et"));
//        Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder()
//                .column("funcId", DataTypes.STRING())
//                .column("serverIp", DataTypes.STRING())
//                .column("outTime", DataTypes.BIGINT())
//                .column("handleSerialNo", DataTypes.STRING())
//                .column("info", DataTypes.STRING())
//                .column("funcIdDesc", DataTypes.STRING())
//                .column("eventTime", DataTypes.TIMESTAMP(3))
//                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//                .build());
Table tableAnswer =tableEnv.fromDataStream(outAnswerDataStream, $("funcId"), $("serverIp"), $("outTime"), $("handleSerialNo"), $("info"), $("funcIdDesc"), $("eventTime").rowtime());
//        Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream, Schema.newBuilder()
//                .column("funcId", DataTypes.STRING())
//                .column("serverIp", DataTypes.STRING())
//                .column("outTime", DataTypes.BIGINT())
//                .column("handleSerialNo", DataTypes.STRING())
//                .column("info", DataTypes.STRING())
//                .column("funcIdDesc", DataTypes.STRING())
//                .column("eventTime", DataTypes.TIMESTAMP(3))
//                .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
//                .build());


Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.et  as et\n" +
" from " + tableRequest + " a\n " +
" inner join " + tableAnswer + " b" +
" on a.handleSerialNo=b.handleSerialNo ");
        System.out.println("这个是resultTable" + result);
        result.printSchema();
        tableEnv.createTemporaryView("resultTable", result);

        DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class);
        Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et").rowtime())
                .select($("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("et"));
        midTable.printSchema();
        tableEnv.createTemporaryView("midTable1", midTable);

//使用TVF的采用渐进式累计窗口进行计算
Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable1" +
//" TABLE "+ midTable +
" , DESCRIPTOR(et)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
        resulTable.printSchema();
//tableEnv.executeSql("select * from "+resulTable).print();

//        DataStream<Tuple2<Boolean, ResultInfo>> resultStream = tableEnv.toRetractStream(resulTable, ResultInfo.class);
//        resultStream.print("resultStream");
//        SingleOutputStreamOperator<ResultInfo> resultInfoStream = resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() {
//            @Override
//            public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws Exception {
//                return value.f1;
//            }
//        });
//        resultInfoStream.print("resultInfoStream");
//        resultInfoStream.addSink(new Sink2Mysql());


env.execute();


    }
}


|
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<53...@qq.com.INVALID> |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:


SingleOutputStreamOperator<Event&gt; eventStream = env
.fromElements(
..............                ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);





我想要更多的代码尤其是midStream的部分,最好说明flink的版本


LY
531599751@qq.com




L&nbsp;Y
531599751@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <ccc0606fighting@163.com&gt;;
发送时间:&nbsp;2023年5月17日(星期三) 上午9:28
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;table api定义rowtime未生效



各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "FROM TABLE(CUMULATE(\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " TABLE midTable1"+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //" TABLE "+ midTable +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , DESCRIPTOR(eventTime)\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '60' SECOND\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '1' DAY))\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|

回复:table api定义rowtime未生效

Posted by 小昌同学 <cc...@163.com>.
数据样例如下:
| {"ip":"10.125.8.111","data":"请求: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=cccf7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"}


{"ip":"10.125.8.139","data":"应答: -- 14:28:05.111 -- <44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=cccf7olmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"} |


| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<53...@qq.com.INVALID> |
| 发送日期 | 2023年5月23日 01:25 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学,感谢你的提供,我已经看完了所有代码并在本地进行了测试,问题出在这一句:


DataStream<MidInfo&gt; midStream = tableEnv.toAppendStream(result, MidInfo.class);


你并未在新的DataStream中插入水位线,也许你下意识觉得不需要,midStream只是使用了result作为数据源和MidInfo.class作为模板创建一个数据流,除此以外都是崭新的流,想要从DataStream中转换一个Table必须确保直接的DataStream有水位线,你可以修改成这样:


DataStream<MidInfo&gt; midStream = tableEnv.toAppendStream(result, MidInfo.class).assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo&gt;forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-&gt; element.getEventTime()));


另外我发现你似乎在每一条数据到来时都通过MapFunction去查询字典表,字典表通常修改很少,这样会导致大量的重复查询进而性能降低,建议你考虑其他方式


L Y


531599751@qq.com





L&nbsp;Y
531599751@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <ccc0606fighting@163.com&gt;;
发送时间:&nbsp;2023年5月22日(星期一) 上午9:26
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
抄送:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp;回复:table api定义rowtime未生效



flink相关的版本是1.14


| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<531599751@qq.com.INVALID&gt; |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<user-zh@flink.apache.org&gt; |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:


SingleOutputStreamOperator<Event&amp;gt; eventStream = env
.fromElements(
..............&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&amp;gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&amp;gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);





我想要更多的代码尤其是midStream的部分,最好说明flink的版本


LY
531599751@qq.com




L&amp;nbsp;Y
531599751@qq.com



&amp;nbsp;




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <ccc0606fighting@163.com&amp;gt;;
发送时间:&amp;nbsp;2023年5月17日(星期三) 上午9:28
收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;

主题:&amp;nbsp;table api定义rowtime未生效



各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "FROM TABLE(CUMULATE(\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " TABLE midTable1"+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //" TABLE "+ midTable +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " , DESCRIPTOR(eventTime)\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " , INTERVAL '60' SECOND\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " , INTERVAL '1' DAY))\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|

回复:table api定义rowtime未生效

Posted by 小昌同学 <cc...@163.com>.
您好,老师,感谢您的回复;我这边按照您的建议进行如下改动:
1、在表转流中增加了水位线的定义;
2、先进行查询字典表,再进行分流;
经过以上操作,我发现数据还是没有办法正常数据,我再debug的时候发现数据运行到拆分流操作,即以下代码:数据发生了卡顿现象; 我再文末附件增加了原始文件以及相关的数据示例,麻烦老师帮忙看一下是哪里的问题,如果方便的话,想加一下老师的微信,我的微信是15956076613
|
//将完整信息的流数据依据请求还是应答进行拆解
OutputTag<BaseInfo2> requestStream = new OutputTag<BaseInfo2>("requestStream") {
};
OutputTag<BaseInfo2> answerStream = new OutputTag<BaseInfo2>("answerStream") {
};

SingleOutputStreamOperator<BaseInfo2> tagStream = completeInfoStream.process(new MyProcessFunction2(requestStream, answerStream));
DataStream<BaseInfo2> requestDataStream = tagStream.getSideOutput(requestStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
        .withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
DataStream<BaseInfo2> answerDataStream = tagStream.getSideOutput(answerStream).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
        .withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()));
|


| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<53...@qq.com.INVALID> |
| 发送日期 | 2023年5月23日 01:25 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学,感谢你的提供,我已经看完了所有代码并在本地进行了测试,问题出在这一句:


DataStream<MidInfo&gt; midStream = tableEnv.toAppendStream(result, MidInfo.class);


你并未在新的DataStream中插入水位线,也许你下意识觉得不需要,midStream只是使用了result作为数据源和MidInfo.class作为模板创建一个数据流,除此以外都是崭新的流,想要从DataStream中转换一个Table必须确保直接的DataStream有水位线,你可以修改成这样:


DataStream<MidInfo&gt; midStream = tableEnv.toAppendStream(result, MidInfo.class).assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo&gt;forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-&gt; element.getEventTime()));


另外我发现你似乎在每一条数据到来时都通过MapFunction去查询字典表,字典表通常修改很少,这样会导致大量的重复查询进而性能降低,建议你考虑其他方式


L Y


531599751@qq.com





L&nbsp;Y
531599751@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <ccc0606fighting@163.com&gt;;
发送时间:&nbsp;2023年5月22日(星期一) 上午9:26
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
抄送:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp;回复:table api定义rowtime未生效



flink相关的版本是1.14


| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<531599751@qq.com.INVALID&gt; |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<user-zh@flink.apache.org&gt; |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:


SingleOutputStreamOperator<Event&amp;gt; eventStream = env
.fromElements(
..............&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&amp;gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&amp;gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);





我想要更多的代码尤其是midStream的部分,最好说明flink的版本


LY
531599751@qq.com




L&amp;nbsp;Y
531599751@qq.com



&amp;nbsp;




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <ccc0606fighting@163.com&amp;gt;;
发送时间:&amp;nbsp;2023年5月17日(星期三) 上午9:28
收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;

主题:&amp;nbsp;table api定义rowtime未生效



各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "FROM TABLE(CUMULATE(\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " TABLE midTable1"+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //" TABLE "+ midTable +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " , DESCRIPTOR(eventTime)\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " , INTERVAL '60' SECOND\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " , INTERVAL '1' DAY))\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|

回复:table api定义rowtime未生效

Posted by L Y <53...@qq.com.INVALID>.
HI,小昌同学,感谢你的提供,我已经看完了所有代码并在本地进行了测试,问题出在这一句:


DataStream<MidInfo&gt; midStream = tableEnv.toAppendStream(result, MidInfo.class);


你并未在新的DataStream中插入水位线,也许你下意识觉得不需要,midStream只是使用了result作为数据源和MidInfo.class作为模板创建一个数据流,除此以外都是崭新的流,想要从DataStream中转换一个Table必须确保直接的DataStream有水位线,你可以修改成这样:


DataStream<MidInfo&gt; midStream = tableEnv.toAppendStream(result, MidInfo.class).assignTimestampsAndWatermarks(WatermarkStrategy.<MidInfo&gt;forBoundedOutOfOrderness(Duration.ofSeconds(3L)).withTimestampAssigner((element,recordTimestamp)-&gt; element.getEventTime()));


另外我发现你似乎在每一条数据到来时都通过MapFunction去查询字典表,字典表通常修改很少,这样会导致大量的重复查询进而性能降低,建议你考虑其他方式


L Y


531599751@qq.com





L&nbsp;Y
531599751@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <ccc0606fighting@163.com&gt;;
发送时间:&nbsp;2023年5月22日(星期一) 上午9:26
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
抄送:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp;回复:table api定义rowtime未生效



flink相关的版本是1.14


| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<531599751@qq.com.INVALID&gt; |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<user-zh@flink.apache.org&gt; |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:


SingleOutputStreamOperator<Event&amp;gt; eventStream = env
.fromElements(
..............&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&amp;gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&amp;gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);





我想要更多的代码尤其是midStream的部分,最好说明flink的版本


LY
531599751@qq.com




L&amp;nbsp;Y
531599751@qq.com



&amp;nbsp;




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <ccc0606fighting@163.com&amp;gt;;
发送时间:&amp;nbsp;2023年5月17日(星期三) 上午9:28
收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;

主题:&amp;nbsp;table api定义rowtime未生效



各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; "FROM TABLE(CUMULATE(\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " TABLE midTable1"+&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //" TABLE "+ midTable +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " , DESCRIPTOR(eventTime)\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " , INTERVAL '60' SECOND\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " , INTERVAL '1' DAY))\n" +&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|

回复:table api定义rowtime未生效

Posted by 小昌同学 <cc...@163.com>.
flink相关的版本是1.14


| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | L Y<53...@qq.com.INVALID> |
| 发送日期 | 2023年5月20日 01:10 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | 回复:table api定义rowtime未生效 |
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:


SingleOutputStreamOperator<Event&gt; eventStream = env
.fromElements(
..............                ).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event&gt;forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Event&gt;() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}
)
);





我想要更多的代码尤其是midStream的部分,最好说明flink的版本


LY
531599751@qq.com




L&nbsp;Y
531599751@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <ccc0606fighting@163.com&gt;;
发送时间:&nbsp;2023年5月17日(星期三) 上午9:28
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;table api定义rowtime未生效



各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "FROM TABLE(CUMULATE(\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " TABLE midTable1"+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //" TABLE "+ midTable +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , DESCRIPTOR(eventTime)\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '60' SECOND\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '1' DAY))\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|

回复:table api定义rowtime未生效

Posted by L Y <53...@qq.com.INVALID>.
HI,小昌同学
最有可能出问题的是midStream的水位线相关部分的代码,根据错误信息建议定位到midStream插入水位线的位置,确保水位线正确插入且使用事件时间语义而不是处理时间语义
例如:


SingleOutputStreamOperator<Event&gt; eventStream = env
        .fromElements(
..............                ).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event&gt;forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<Event&gt;() {
                                    @Override
                                    public long extractTimestamp(Event event, long l) {
                                        return event.timestamp;
                                    }
                                }
                        )
        );





我想要更多的代码尤其是midStream的部分,最好说明flink的版本


LY
531599751@qq.com




L&nbsp;Y
531599751@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <ccc0606fighting@163.com&gt;;
发送时间:&nbsp;2023年5月17日(星期三) 上午9:28
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;table api定义rowtime未生效



各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "FROM TABLE(CUMULATE(\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " TABLE midTable1"+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //" TABLE "+ midTable +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , DESCRIPTOR(eventTime)\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '60' SECOND\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " , INTERVAL '1' DAY))\n" +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; " GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fighting@163.com
|