You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈隽尧 <ch...@htsc.com> on 2023/03/08 13:31:12 UTC

flink问题咨询

您好,

     我是flink一名新用户,最近在项目中需要用到flink完成一项业务功能,但目前遇到了一些一些困难,想咨询你一下是否有合适的解决方案,期待您的回信

 

     问题背景:我们需要基于股票交易流水和股票行情去计算股票账户层面的一些指标(为简化场景,假定账户指标只有持仓量,买入均价,市值),页面前端20s刷新一次,指标计算想基于flink的dataStream Api实现,但遇到一个问题,目前初步想法如下,请flink大神帮忙指导 

 

        初步方案设想:假定stream1: 股票交易流水, stream2:股票行情流水  stream1.keyBy().connect(stream2.keyBy()).process(), key为股票代码,在processFunction里面 

 

Ø  open方法:加载日初预算的指标值到一个ListState中,listState里面对象包含四个字段: 账户,持仓量,买入均价,市值 (均为日初的值)

 

Ø  processElement1:基于每笔股票交易流去计算仅受交易流水影响的指标(如持仓量和买入均价),更新ListState, 

 

Ø  processElement2:  只把行情作为状态缓存(MapState,key为股票代码),保留每个标的最新的行情(由于系统只20s更新一次数据,行情的推送频率相对较高大约3s一次,没必要每来一次就算一次) 

 

Ø  设置一个定义器20s执行一次,在onTime() 去基于上面的ListState和当前最新行情的MapState计算市值然后更新去ListState中市值数据(市值的计算逻辑是最新持仓*行情最新价格,onTime里面只会发送20s内有变化的ListState到下一个算子继续处理(下个算子会做汇聚计算) 

 

 

              存在问题:就是我怎么知道每次onTime触发的20s内,哪些ListState发生了变化?因为processElement方法和onTimer方法我理解时在两个线程里面分别处理的,如果在processElement方法中通过给listState里面每个数据加修改状态,在onTimer获取标记的ListState然后要清除状态,但要保证正确必须做线程同步,感觉flink里面              做线程同步是不是不太合适

 

²  不用stream1.coGroup(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.hours(1))).apply(<CoGroupFunction>)的原因,是因为coGroupFunction只会在窗口关闭时触发计算,但是交易流可以来一条数据处理一次,不想做成这种微批处理

²  使用定时器不在processElement2的方法中计算的原因是因为只需要20s更新一次,行情更新频率较快,没必要浪费算力行情数据每次来一次算一次


保密备注:   本邮件及其附件含有华泰证券股份有限公司及/或其子公司的保密信息,仅限于发送给上面地址中列出的个人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!声明:   本邮件提供的信息或观点不构成购买或出售所提及的投资产品的投资建议、要约或招揽。与您的特定投资目标、财务状况和特定需求无关。因此,不承担由此信息直接或间接导致损失的责任。   计算机病毒可以通过电子邮件传播。 接收方应在接收电子邮件或任何附件时检查有无病毒。本公司对由于本电子邮件引发病毒所产生的任何损失不承担任何责任。电子邮件传输过程中不能确保安全和准确,信息可能被拦截、篡改、丢失、损坏,也可能延迟送达、不完整或包含病毒,因此本公司对电子邮件传输过程中所产生的任何内容错误或缺失不承担任何责任。Confidentiality Note:          This e-mail and its attachments contain confidential information from Huatai Securities Co., Ltd. and/or its subsidiaries, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it.Disclaimer:         The information or opinions provided in this email do not constitute an investment advice, an offer or solicitation to subscribe for, purchase or sell the investment product(s) mentioned herein. It does not have any regard to your specific investment objectives, financial situation and any of your particular needs. Accordingly, no warranty whatsoever is given and no liability whatsoever is accepted for any loss arising whether directly or indirectly as a result of this information.        Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. E-mail transmission cannot be guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender therefore does not accept liability for any errors or omissions in the contents of this message, which arise as a result of e-mail transmission.


Re: flink问题咨询

Posted by Shammon FY <zj...@gmail.com>.
Hi

我个人觉得可以将你现在的process计算分为两部分,你提到每隔20s触发的delta计算部分放到stream2部分,类似于这种形式
 stream1.keyBy().connect(stream2.keyBy().process(处理增量,每20秒触发输出)).process(根据增量更新ListState)

这样不需要从ListState中去查找哪些数据被更新了

Best,
Shammon


On Thu, Mar 9, 2023 at 10:48 AM 陈隽尧 <ch...@htsc.com> wrote:

> 您好,
>
>
>  我是flink一名新用户,最近在项目中需要用到flink完成一项业务功能,但目前遇到了一些一些困难,想咨询你一下是否有合适的解决方案,期待您的回信
>
>
>
>
>  问题背景:我们需要基于股票交易流水和股票行情去计算股票账户层面的一些指标(为简化场景,假定账户指标只有持仓量,买入均价,市值),页面前端20s刷新一次,指标计算想基于flink的dataStream
> Api实现,但遇到一个问题,目前初步想法如下,请flink大神帮忙指导
>
>
>
>         初步方案设想:假定stream1: 股票交易流水, stream2:股票行情流水
> stream1.keyBy().connect(stream2.keyBy()).process(),
> key为股票代码,在processFunction里面
>
>
>
> Ø  open方法:加载日初预算的指标值到一个ListState中,listState里面对象包含四个字段: 账户,持仓量,买入均价,市值
> (均为日初的值)
>
>
>
> Ø  processElement1:基于每笔股票交易流去计算仅受交易流水影响的指标(如持仓量和买入均价),更新ListState,
>
>
>
> Ø  processElement2:
> 只把行情作为状态缓存(MapState,key为股票代码),保留每个标的最新的行情(由于系统只20s更新一次数据,行情的推送频率相对较高大约3s一次,没必要每来一次就算一次)
>
>
>
>
> Ø  设置一个定义器20s执行一次,在onTime()
> 去基于上面的ListState和当前最新行情的MapState计算市值然后更新去ListState中市值数据(市值的计算逻辑是最新持仓*行情最新价格,onTime里面只会发送20s内有变化的ListState到下一个算子继续处理(下个算子会做汇聚计算)
>
>
>
>
>
>
>
> 存在问题:就是我怎么知道每次onTime触发的20s内,哪些ListState发生了变化?因为processElement方法和onTimer方法我理解时在两个线程里面分别处理的,如果在processElement方法中通过给listState里面每个数据加修改状态,在onTimer获取标记的ListState然后要清除状态,但要保证正确必须做线程同步,感觉flink里面
>             做线程同步是不是不太合适
>
>
>
> ²
> 不用stream1.coGroup(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.hours(1))).apply(<CoGroupFunction>)的原因,是因为coGroupFunction只会在窗口关闭时触发计算,但是交易流可以来一条数据处理一次,不想做成这种微批处理
>
> ²
> 使用定时器不在processElement2的方法中计算的原因是因为只需要20s更新一次,行情更新频率较快,没必要浪费算力行情数据每次来一次算一次
>
>
> 保密备注:
>  本邮件及其附件含有华泰证券股份有限公司及/或其子公司的保密信息,仅限于发送给上面地址中列出的个人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!声明:
>  本邮件提供的信息或观点不构成购买或出售所提及的投资产品的投资建议、要约或招揽。与您的特定投资目标、财务状况和特定需求无关。因此,不承担由此信息直接或间接导致损失的责任。
>  计算机病毒可以通过电子邮件传播。
> 接收方应在接收电子邮件或任何附件时检查有无病毒。本公司对由于本电子邮件引发病毒所产生的任何损失不承担任何责任。电子邮件传输过程中不能确保安全和准确,信息可能被拦截、篡改、丢失、损坏,也可能延迟送达、不完整或包含病毒,因此本公司对电子邮件传输过程中所产生的任何内容错误或缺失不承担任何责任。Confidentiality
> Note:          This e-mail and its attachments contain confidential
> information from Huatai Securities Co., Ltd. and/or its subsidiaries, which
> is intended only for the person or entity whose address is listed above.
> Any use of the information contained herein in any way (including, but not
> limited to, total or partial disclosure, reproduction, or dissemination) by
> persons other than the intended recipient(s) is prohibited. If you receive
> this e-mail in error, please notify the sender by phone or email
> immediately and delete it.Disclaimer:         The information or opinions
> provided in this email do not constitute an investment advice, an offer or
> solicitation to subscribe for, purchase or sell the investment product(s)
> mentioned herein. It does not have any regard to your specific investment
> objectives, financial situation and any of your particular needs.
> Accordingly, no warranty whatsoever is given and no liability whatsoever is
> accepted for any loss arising whether directly or indirectly as a result of
> this information.        Computer viruses can be transmitted via email. The
> recipient should check this email and any attachments for the presence of
> viruses. The company accepts no liability for any damage caused by any
> virus transmitted by this email. E-mail transmission cannot be guaranteed
> to be secure or error-free as information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or contain viruses. The sender
> therefore does not accept liability for any errors or omissions in the
> contents of this message, which arise as a result of e-mail transmission.
>
>