You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Artem Bogachev <ar...@ostrovok.ru> on 2016/05/20 10:43:26 UTC

join stream with last available element of other stream

Hi,

I’ve faced a problem trying to model our platform using Flink Streams.

Let me describe our model:

// Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc.
val realData: DataStream[(K, V)] =  env.addSource(…)

// Stream of forecasts (same format) based on some window aggregates
val forecastedData: DataStream[(K, V)] = realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new Forecaster(…))

I would like to construct a stream errors, which values are just differences between realData stream and the latest available forecast for this key in forecastedData stream

// I suppose this solution does not guarantee that all realData values will have corresponding forecast
val errors: DataStream[(K, V)] = realData.join(forecastedData).where(0).equal(0)…

Could you give an advice on how to implement such pattern? Do I have to write custom windows?

Artem

Re: join stream with last available element of other stream

Posted by Ufuk Celebi <uc...@apache.org>.
Aljoscha answered this in the other thread you started for this
("'Last One' Window")

On Fri, May 20, 2016 at 12:43 PM, Artem Bogachev
<ar...@ostrovok.ru> wrote:
> Hi,
>
> I’ve faced a problem trying to model our platform using Flink Streams.
>
> Let me describe our model:
>
> // Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc.
> val realData: DataStream[(K, V)] =  env.addSource(…)
>
> // Stream of forecasts (same format) based on some window aggregates
> val forecastedData: DataStream[(K, V)] =
> realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new
> Forecaster(…))
>
> I would like to construct a stream errors, which values are just differences
> between realData stream and the latest available forecast for this key in
> forecastedData stream
>
> // I suppose this solution does not guarantee that all realData values will
> have corresponding forecast
> val errors: DataStream[(K, V)] =
> realData.join(forecastedData).where(0).equal(0)…
>
> Could you give an advice on how to implement such pattern? Do I have to
> write custom windows?
>
> Artem