You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Udhay <ud...@gmail.com> on 2017/07/05 09:53:21 UTC

Joining two aggregated streams

I was trying to join two keyed streams in a particular way and get a combined
stream.
For example:
Lets say I call the two streams as X and Y. 
The X stream contains:

(Key,Value)

(A,P)
(A,Q)
(A,R)
(B,P)
(C,P)
(C,Q)

The Y stream contains:

(Key,Value,Flag1,Flag2)

(A,M1,0,0)
(A,M2,0,0)
(A,M3,1,0)
(A,M4,0,0)
(A,M5,1,0)
(A,M6,0,1)
(B,N1,0,0)
(B,N2,1,0)
(B,N3,0,1)
(C,O1,1,0)
(C,O2,0,1)

My objective is to join these two streams and get the combined value as
described. I want a keywise aggregated data of "Value" field from the X
stream. In the Y stream I want a keywise aggregation of "Value" field based
on "Flag1" i.e., the output will be set of aggregated values. I want to join
these two streams by maintaining a keyed window and that window gets
triggered only when the "Flag2" value of a particular key in the Y stream is
"1". These flag values are available only with Y stream and not with the X
stream. Thus my end result should look like:

(Key,Value1,Value2)

(A,(P#Q#R),[(M1#M2#M4#M6),(M3#M5)])
(B,P,[(N1#N3),(N2)])
(C,(P#Q),[(O1),(O2)])

The timings for each of the rows in each stream are such that by the time
the Flag2 value is 1 in stream Y (indicates some sort of end of a session),
all the rows in stream X are also already available.

I tried to maintains state value inside my join function to get to the
output. But I dont know how to query the state value and when to do it. Can
anyone please suggest some solution?





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-aggregated-streams-tp14123.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Joining two aggregated streams

Posted by Udhay <ud...@gmail.com>.
Hi 

Thanks for your suggestion. I ll try this one.:)

-Udhay.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-aggregated-streams-tp14123p14289.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Joining two aggregated streams

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

For this case, I would suggest to implement the join operation “by hand” using a CoProcessFunction: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html#low-level-joins <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html#low-level-joins>. You would have the first stream on the first input and the second stream on the second input. Inside the function you keep in state the stuff that you want to emit when you see that flag. Possibly in ListState. I would also suggest to set a cleanup timer to make sure that you cleanup state in case you never see the flag that triggers processing of your window.

Best,
Aljoscha

> On 5. Jul 2017, at 11:53, Udhay <ud...@gmail.com> wrote:
> 
> I was trying to join two keyed streams in a particular way and get a combined
> stream.
> For example:
> Lets say I call the two streams as X and Y. 
> The X stream contains:
> 
> (Key,Value)
> 
> (A,P)
> (A,Q)
> (A,R)
> (B,P)
> (C,P)
> (C,Q)
> 
> The Y stream contains:
> 
> (Key,Value,Flag1,Flag2)
> 
> (A,M1,0,0)
> (A,M2,0,0)
> (A,M3,1,0)
> (A,M4,0,0)
> (A,M5,1,0)
> (A,M6,0,1)
> (B,N1,0,0)
> (B,N2,1,0)
> (B,N3,0,1)
> (C,O1,1,0)
> (C,O2,0,1)
> 
> My objective is to join these two streams and get the combined value as
> described. I want a keywise aggregated data of "Value" field from the X
> stream. In the Y stream I want a keywise aggregation of "Value" field based
> on "Flag1" i.e., the output will be set of aggregated values. I want to join
> these two streams by maintaining a keyed window and that window gets
> triggered only when the "Flag2" value of a particular key in the Y stream is
> "1". These flag values are available only with Y stream and not with the X
> stream. Thus my end result should look like:
> 
> (Key,Value1,Value2)
> 
> (A,(P#Q#R),[(M1#M2#M4#M6),(M3#M5)])
> (B,P,[(N1#N3),(N2)])
> (C,(P#Q),[(O1),(O2)])
> 
> The timings for each of the rows in each stream are such that by the time
> the Flag2 value is 1 in stream Y (indicates some sort of end of a session),
> all the rows in stream X are also already available.
> 
> I tried to maintains state value inside my join function to get to the
> output. But I dont know how to query the state value and when to do it. Can
> anyone please suggest some solution?
> 
> 
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-aggregated-streams-tp14123.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.