You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Aggarwal, Ajay" <Aj...@netapp.com> on 2019/02/01 21:42:39 UTC

Reverse of KeyBy

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?



Let me explain using my use case below.



My Input stream contains messages with following information

{

    MyKey

    LargeMessageId

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment



    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}





My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1.  Reassemble these fragments back into LargeMessages
  2.  For each MyKey value, process the LargeMessages in the order based on time associated with them.





So I am thinking



InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???



At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.



Ajay


Re: Reverse of KeyBy

Posted by "Aggarwal, Ajay" <Aj...@netapp.com>.
Thanks Fabian for the explanation. Let me do some more reading so what you said can sync-in little more.

From: Fabian Hueske <fh...@gmail.com>
Date: Monday, February 4, 2019 at 10:22 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>
Cc: Congxian Qiu <qc...@gmail.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Reverse of KeyBy

Hi,

Subpartitions are just a logical concept. When you keyBy a stream, the next operator will be applied in a keyed context. After that, the data might still be partitioned, but the keyed context is gone.
Is this what you mean by automatic "joining of partitioned sub-streams"?

With the program that you shared before, the following happens:

(1) The records are partitioned on the LargeMessageId, i.e., all records with the same LargeMessageId are sent to the same task.
(2) The task collects all fragements of a large message in keyed state. The state is always scoped to the key (LargeMessageId). Once it collected all fragments, it emits a complete message.
(3) The completed messages are partitioned on MyKey, i.e., all messages with the same MyKey are sent to the same task.
(4) A function can collect and sort the messages to process them in order.

Since you shuffle the records twice you cannot (in general) expect the records to be still in order.

Best, Fabian


Am Mo., 4. Feb. 2019 um 16:08 Uhr schrieb Aggarwal, Ajay <Aj...@netapp.com>>:
Yes, LargeMessageId is globally unique, so I shouldn’t need composite key.

So both of you are suggesting I do the following

InputStream
  (1)   .keyBy (LargeMessageId)
  (2)   .flatMap(new MyReassemblyFunction())
  (3)   .keyBy(MyKey)
  (4)   .???

Let me explain my doubt (perhaps due to lack of understanding). By the way I am expecting to run this job with parallelism > 1. My understanding of above is as below:

First operator (1): First KeyBy (LargeMessageId) will partition the input stream by LargeMessageId. Right here messages with same MyKey value will be spread across these partitions. Is it not a problem already?
Second operator (2) : run flatMap(new MyReassemblyFunction()) on these partitions. Here each one will produce exactly one LargeMessage.
Third operator (3):  At this point I don’t understand the point of second KeyBy(MyKey)? My understanding is that this will further partition the already partitioned input stream (from 1 above) and will not help me, as I need to process all LargeMessages for a given MyKey in order.

Is there an implicit assumption here that the flatMap operation (2) above will automatically join the partitioned sub-streams from first KeyBy into a single stream?

Ajay


From: Fabian Hueske <fh...@gmail.com>>
Date: Monday, February 4, 2019 at 9:17 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>>
Cc: Congxian Qiu <qc...@gmail.com>>, "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Reverse of KeyBy

Hi,

Calling keyBy twice will not work, because the second call overrides the first.
You can keyBy on a composite key (MyKey, LargeMessageId).

You can do the following

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())
  .keyBy(MyKey)
  .???

If LargeMessageId is unique across MyKey (there are not two large messages with the same LargeMessageId and different MyKey values), you don't need a composite key but can use keyBy(LargeMessageId).

Best, Fabian


Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <Aj...@netapp.com>>:
Thank you for your suggestion. But per my understanding if I KeyBy (LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. Because MyKey messages will get spread over multiple partitions by LargeMessageId. Am I correct?


From: Congxian Qiu <qc...@gmail.com>>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Reverse of KeyBy
Hi Aggarwal
   How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)?

Best,
Congxian


Aggarwal, Ajay <Aj...@netapp.com>> 于2019年2月2日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?



Let me explain using my use case below.



My Input stream contains messages with following information

{

    MyKey

    LargeMessageId

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment



    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}





My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1.  Reassemble these fragments back into LargeMessages
  2.  For each MyKey value, process the LargeMessages in the order based on time associated with them.





So I am thinking



InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???



At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.



Ajay


Re: Reverse of KeyBy

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Subpartitions are just a logical concept. When you keyBy a stream, the next
operator will be applied in a keyed context. After that, the data might
still be partitioned, but the keyed context is gone.
Is this what you mean by automatic "joining of partitioned sub-streams"?

With the program that you shared before, the following happens:

(1) The records are partitioned on the LargeMessageId, i.e., all records
with the same LargeMessageId are sent to the same task.
(2) The task collects all fragements of a large message in keyed state. The
state is always scoped to the key (LargeMessageId). Once it collected all
fragments, it emits a complete message.
(3) The completed messages are partitioned on MyKey, i.e., all messages
with the same MyKey are sent to the same task.
(4) A function can collect and sort the messages to process them in order.

Since you shuffle the records twice you cannot (in general) expect the
records to be still in order.

Best, Fabian


Am Mo., 4. Feb. 2019 um 16:08 Uhr schrieb Aggarwal, Ajay <
Ajay.Aggarwal@netapp.com>:

> Yes, LargeMessageId is globally unique, so I shouldn’t need composite key.
>
>
>
> So both of you are suggesting I do the following
>
>
>
> InputStream
>   (1)   .keyBy (LargeMessageId)
>
>   (2)   .flatMap(new MyReassemblyFunction())
>
>   (3)   .keyBy(MyKey)
>
>   (4)   .???
>
>
>
> Let me explain my doubt (perhaps due to lack of understanding). By the way
> I am expecting to run this job with parallelism > 1. My understanding of
> above is as below:
>
>
>
> First operator (1): First KeyBy (LargeMessageId) will partition the input
> stream by LargeMessageId. Right here messages with same MyKey value will be
> spread across these partitions. Is it not a problem already?
>
> Second operator (2) : run flatMap(new MyReassemblyFunction()) on these
> partitions. Here each one will produce exactly one LargeMessage.
>
> Third operator (3):  At this point I don’t understand the point of second
> KeyBy(MyKey)? My understanding is that this will further partition the
> already partitioned input stream (from 1 above) and will not help me, as I
> need to process all LargeMessages for a given MyKey in order.
>
>
>
> Is there an implicit assumption here that the flatMap operation (2) above
> will automatically join the partitioned sub-streams from first KeyBy into a
> single stream?
>
>
>
> Ajay
>
>
>
>
>
> *From: *Fabian Hueske <fh...@gmail.com>
> *Date: *Monday, February 4, 2019 at 9:17 AM
> *To: *"Aggarwal, Ajay" <Aj...@netapp.com>
> *Cc: *Congxian Qiu <qc...@gmail.com>, "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Reverse of KeyBy
>
>
>
> Hi,
>
>
>
> Calling keyBy twice will not work, because the second call overrides the
> first.
>
> You can keyBy on a composite key (MyKey, LargeMessageId).
>
>
>
> You can do the following
>
>
>
> InputStream
>   .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
>   .flatMap(new MyReassemblyFunction())
>
>   .keyBy(MyKey)
>
>   .???
>
>
>
> If LargeMessageId is unique across MyKey (there are not two large messages
> with the same LargeMessageId and different MyKey values), you don't need a
> composite key but can use keyBy(LargeMessageId).
>
>
>
> Best, Fabian
>
>
>
>
>
> Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <
> Ajay.Aggarwal@netapp.com>:
>
> Thank you for your suggestion. But per my understanding if I KeyBy
> (LargeMessageId) first then I can’t guarantee order of LargeMessages per
> MyKey. Because MyKey messages will get spread over multiple partitions by
> LargeMessageId. Am I correct?
>
>
>
>
>
> *From: *Congxian Qiu <qc...@gmail.com>
> *Date: *Sunday, February 3, 2019 at 6:40 AM
> *To: *"Aggarwal, Ajay" <Aj...@netapp.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Reverse of KeyBy
>
> Hi Aggarwal
>    How about keyBy(LargeMessageID) first, then assemble these fragments
> back into LargeMessages, then keyBy(MeyKey)?
>
>
>
> Best,
>
> Congxian
>
>
>
>
>
> Aggarwal, Ajay <Aj...@netapp.com> 于2019年2月2日周六 上午5:42写道:
>
> I am new to Flink. I am trying to figure out if there is an operator that
> provides reverse functionality of KeyBy.  Using KeyBy you can split a
> stream into disjoint partitions. Is there a way to bring those partitions
> back into a single stream?
>
>
>
> Let me explain using my use case below.
>
>
>
> My Input stream contains messages with following information
>
> {
>
>     MyKey
>
>     LargeMessageId
>
>     LargeMessageFragment
>
>     LargeMessageTimestamp // yes same timestamp repeated with each
> fragment
>
>
>
>     (… there are other fields, but I am leaving them out as they are not
> important for this discussion)
>
> }
>
>
>
>
>
> My LargeMessage is fragmented at source into fragments. I have 2 main
> requirements
>
>    1. Reassemble these fragments back into LargeMessages
>    2. For each MyKey value, process the LargeMessages in the order based
>    on time associated with them.
>
>
>
>
>
> So I am thinking
>
>
>
> InputStream
>
>   .KeyBy (MyKey)
>
>   .KeyBy (LargeMessageId)
>
>   .flatMap(new MyReassemblyFunction())
>
>   . ???
>
>
>
> At this point I need to throw all assembled LargeMessages for a given
> MyKey back into a common partition, so I can try to process them in order.
> This is where I am stuck. Any help from the experts will be much
> appreciated.
>
>
>
> Ajay
>
>
>
>

Re: Reverse of KeyBy

Posted by "Aggarwal, Ajay" <Aj...@netapp.com>.
Yes, LargeMessageId is globally unique, so I shouldn’t need composite key.

So both of you are suggesting I do the following

InputStream
  (1)   .keyBy (LargeMessageId)
  (2)   .flatMap(new MyReassemblyFunction())
  (3)   .keyBy(MyKey)
  (4)   .???

Let me explain my doubt (perhaps due to lack of understanding). By the way I am expecting to run this job with parallelism > 1. My understanding of above is as below:

First operator (1): First KeyBy (LargeMessageId) will partition the input stream by LargeMessageId. Right here messages with same MyKey value will be spread across these partitions. Is it not a problem already?
Second operator (2) : run flatMap(new MyReassemblyFunction()) on these partitions. Here each one will produce exactly one LargeMessage.
Third operator (3):  At this point I don’t understand the point of second KeyBy(MyKey)? My understanding is that this will further partition the already partitioned input stream (from 1 above) and will not help me, as I need to process all LargeMessages for a given MyKey in order.

Is there an implicit assumption here that the flatMap operation (2) above will automatically join the partitioned sub-streams from first KeyBy into a single stream?

Ajay


From: Fabian Hueske <fh...@gmail.com>
Date: Monday, February 4, 2019 at 9:17 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>
Cc: Congxian Qiu <qc...@gmail.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Reverse of KeyBy

Hi,

Calling keyBy twice will not work, because the second call overrides the first.
You can keyBy on a composite key (MyKey, LargeMessageId).

You can do the following

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())
  .keyBy(MyKey)
  .???

If LargeMessageId is unique across MyKey (there are not two large messages with the same LargeMessageId and different MyKey values), you don't need a composite key but can use keyBy(LargeMessageId).

Best, Fabian


Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <Aj...@netapp.com>>:
Thank you for your suggestion. But per my understanding if I KeyBy (LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. Because MyKey messages will get spread over multiple partitions by LargeMessageId. Am I correct?


From: Congxian Qiu <qc...@gmail.com>>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Reverse of KeyBy
Hi Aggarwal
   How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)?

Best,
Congxian


Aggarwal, Ajay <Aj...@netapp.com>> 于2019年2月2日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?



Let me explain using my use case below.



My Input stream contains messages with following information

{

    MyKey

    LargeMessageId

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment



    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}





My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1.  Reassemble these fragments back into LargeMessages
  2.  For each MyKey value, process the LargeMessages in the order based on time associated with them.





So I am thinking



InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???



At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.



Ajay


Re: Reverse of KeyBy

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Calling keyBy twice will not work, because the second call overrides the
first.
You can keyBy on a composite key (MyKey, LargeMessageId).

You can do the following

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())
  .keyBy(MyKey)
  .???

If LargeMessageId is unique across MyKey (there are not two large messages
with the same LargeMessageId and different MyKey values), you don't need a
composite key but can use keyBy(LargeMessageId).

Best, Fabian


Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <
Ajay.Aggarwal@netapp.com>:

> Thank you for your suggestion. But per my understanding if I KeyBy
> (LargeMessageId) first then I can’t guarantee order of LargeMessages per
> MyKey. Because MyKey messages will get spread over multiple partitions by
> LargeMessageId. Am I correct?
>
>
>
>
>
> *From: *Congxian Qiu <qc...@gmail.com>
> *Date: *Sunday, February 3, 2019 at 6:40 AM
> *To: *"Aggarwal, Ajay" <Aj...@netapp.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Reverse of KeyBy
>
> Hi Aggarwal
>    How about keyBy(LargeMessageID) first, then assemble these fragments
> back into LargeMessages, then keyBy(MeyKey)?
>
>
>
> Best,
>
> Congxian
>
>
>
>
>
> Aggarwal, Ajay <Aj...@netapp.com> 于2019年2月2日周六 上午5:42写道:
>
> I am new to Flink. I am trying to figure out if there is an operator that
> provides reverse functionality of KeyBy.  Using KeyBy you can split a
> stream into disjoint partitions. Is there a way to bring those partitions
> back into a single stream?
>
>
>
> Let me explain using my use case below.
>
>
>
> My Input stream contains messages with following information
>
> {
>
>     MyKey
>
>     LargeMessageId
>
>     LargeMessageFragment
>
>     LargeMessageTimestamp // yes same timestamp repeated with each
> fragment
>
>
>
>     (… there are other fields, but I am leaving them out as they are not
> important for this discussion)
>
> }
>
>
>
>
>
> My LargeMessage is fragmented at source into fragments. I have 2 main
> requirements
>
>    1. Reassemble these fragments back into LargeMessages
>    2. For each MyKey value, process the LargeMessages in the order based
>    on time associated with them.
>
>
>
>
>
> So I am thinking
>
>
>
> InputStream
>
>   .KeyBy (MyKey)
>
>   .KeyBy (LargeMessageId)
>
>   .flatMap(new MyReassemblyFunction())
>
>   . ???
>
>
>
> At this point I need to throw all assembled LargeMessages for a given
> MyKey back into a common partition, so I can try to process them in order.
> This is where I am stuck. Any help from the experts will be much
> appreciated.
>
>
>
> Ajay
>
>
>
>

Re: Reverse of KeyBy

Posted by "Aggarwal, Ajay" <Aj...@netapp.com>.
Thank you for your suggestion. But per my understanding if I KeyBy (LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. Because MyKey messages will get spread over multiple partitions by LargeMessageId. Am I correct?


From: Congxian Qiu <qc...@gmail.com>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <Aj...@netapp.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Reverse of KeyBy
Hi Aggarwal
   How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)?

Best,
Congxian


Aggarwal, Ajay <Aj...@netapp.com>> 于2019年2月2日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?



Let me explain using my use case below.



My Input stream contains messages with following information

{

    MyKey

    LargeMessageId

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment



    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}





My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1.  Reassemble these fragments back into LargeMessages
  2.  For each MyKey value, process the LargeMessages in the order based on time associated with them.





So I am thinking



InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???



At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.



Ajay


Re: Reverse of KeyBy

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Aggarwal   How about keyBy(LargeMessageID) first, then assemble these
fragments back into LargeMessages, then keyBy(MeyKey)?

Best,
Congxian


Aggarwal, Ajay <Aj...@netapp.com> 于2019年2月2日周六 上午5:42写道:

> I am new to Flink. I am trying to figure out if there is an operator that
> provides reverse functionality of KeyBy.  Using KeyBy you can split a
> stream into disjoint partitions. Is there a way to bring those partitions
> back into a single stream?
>
>
>
> Let me explain using my use case below.
>
>
>
> My Input stream contains messages with following information
>
> {
>
>     MyKey
>
>     LargeMessageId
>
>     LargeMessageFragment
>
>     LargeMessageTimestamp // yes same timestamp repeated with each
> fragment
>
>
>
>     (… there are other fields, but I am leaving them out as they are not
> important for this discussion)
>
> }
>
>
>
>
>
> My LargeMessage is fragmented at source into fragments. I have 2 main
> requirements
>
>    1. Reassemble these fragments back into LargeMessages
>    2. For each MyKey value, process the LargeMessages in the order based
>    on time associated with them.
>
>
>
>
>
> So I am thinking
>
>
>
> InputStream
>
>   .KeyBy (MyKey)
>
>   .KeyBy (LargeMessageId)
>
>   .flatMap(new MyReassemblyFunction())
>
>   . ???
>
>
>
> At this point I need to throw all assembled LargeMessages for a given
> MyKey back into a common partition, so I can try to process them in order.
> This is where I am stuck. Any help from the experts will be much
> appreciated.
>
>
>
> Ajay
>
>
>