You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Björn Zachrisson <bj...@gmail.com> on 2018/02/16 11:05:39 UTC

Keyby multiple elements

Hi,

I have something weird probably a user error :)

I'm running a keyby on multiple elements

val keyedStream = nonKeyedStream
  .keyBy(m => (m.1, m.2, m.3.getOrElse(-1), m.4))

then apply a window function

val appliedWindow = keyedStream
  .timeWindow(minutes(WindowTimeMinutes))
  .allowedLateness(minutes(WindowDelayMinutes))
  .apply(new windowFunc1)


This is my first action in apply function

override def apply(key: (Int,Long,Int,Int), window: TimeWindow, input:
Iterable[T4], out: Collector[T4]): Unit = {
  myClass.addKey(key,window)


And in this class i have a function

object myClass{
  val keyHash = new mutable.HashMap[(Int,Long,Int,Int),TimeWindow]()


  def addKey(key: (Int,Long,Int,Int), window : TimeWindow) : Unit = {
    if(keyHash.contains(key)){
      printf("Multiple key found for: " + key + "\n")
      printf(keyHash(key) + "\n")
      printf(window + "\n")
    }
    keyHash.put(key,window)
  }
}


This outputs
Multiple key found for: (1,2,3,4)
TimeWindow{start=1518268800000, end=1518270000000}
TimeWindow{start=1518268800000, end=1518270000000}


So it seems that the keyby is not doing what is is supposed to or I am
messing things up somewhere but I cannot seem to find it.

Regards
Björn

Re: Keyby multiple elements

Posted by Fabian Hueske <fh...@gmail.com>.
Yes, you can do that. You would have to define a custom trigger.
Alternatively, you can also generate more conservative watermarks. That
would have the same effect.

Best, Fabian

2018-02-16 12:25 GMT+01:00 Björn Zachrisson <bj...@gmail.com>:

> Hi Fabian,
>
> It does not since my events are out of order within a certain interval and
> removing allowedLateness reduces the elements processed with 99.5%.
> Is it possible to trigger the window first when the allowed latness value
> has been passed?
>
>
> Regards
> Björn Zachrisson
>
> On 16 February 2018 at 12:17, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Björn,
>>
>> You configured an allowed lateness, so this might be caused by late
>> arriving data.
>> In case a late record is received within the allowed lateness, the window
>> function will fire again for the same window.
>> Does that also happen if you remove the allowedLateness() call?
>>
>> Best, Fabian
>>
>>
>> 2018-02-16 12:05 GMT+01:00 Björn Zachrisson <bj...@gmail.com>:
>>
>>> Hi,
>>>
>>> I have something weird probably a user error :)
>>>
>>> I'm running a keyby on multiple elements
>>>
>>> val keyedStream = nonKeyedStream
>>>   .keyBy(m => (m.1, m.2, m.3.getOrElse(-1), m.4))
>>>
>>> then apply a window function
>>>
>>> val appliedWindow = keyedStream
>>>   .timeWindow(minutes(WindowTimeMinutes))
>>>   .allowedLateness(minutes(WindowDelayMinutes))
>>>   .apply(new windowFunc1)
>>>
>>>
>>> This is my first action in apply function
>>>
>>> override def apply(key: (Int,Long,Int,Int), window: TimeWindow, input: Iterable[T4], out: Collector[T4]): Unit = {
>>>   myClass.addKey(key,window)
>>>
>>>
>>> And in this class i have a function
>>>
>>> object myClass{
>>>   val keyHash = new mutable.HashMap[(Int,Long,Int,Int),TimeWindow]()
>>>
>>>
>>>   def addKey(key: (Int,Long,Int,Int), window : TimeWindow) : Unit = {
>>>     if(keyHash.contains(key)){
>>>       printf("Multiple key found for: " + key + "\n")
>>>       printf(keyHash(key) + "\n")
>>>       printf(window + "\n")
>>>     }
>>>     keyHash.put(key,window)
>>>   }
>>> }
>>>
>>>
>>> This outputs
>>> Multiple key found for: (1,2,3,4)
>>> TimeWindow{start=1518268800000, end=1518270000000}
>>> TimeWindow{start=1518268800000, end=1518270000000}
>>>
>>>
>>> So it seems that the keyby is not doing what is is supposed to or I am
>>> messing things up somewhere but I cannot seem to find it.
>>>
>>> Regards
>>> Björn
>>>
>>>
>>>
>>
>

Re: Keyby multiple elements

Posted by Björn Zachrisson <bj...@gmail.com>.
Hi Fabian,

It does not since my events are out of order within a certain interval and
removing allowedLateness reduces the elements processed with 99.5%.
Is it possible to trigger the window first when the allowed latness value
has been passed?


Regards
Björn Zachrisson

On 16 February 2018 at 12:17, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Björn,
>
> You configured an allowed lateness, so this might be caused by late
> arriving data.
> In case a late record is received within the allowed lateness, the window
> function will fire again for the same window.
> Does that also happen if you remove the allowedLateness() call?
>
> Best, Fabian
>
>
> 2018-02-16 12:05 GMT+01:00 Björn Zachrisson <bj...@gmail.com>:
>
>> Hi,
>>
>> I have something weird probably a user error :)
>>
>> I'm running a keyby on multiple elements
>>
>> val keyedStream = nonKeyedStream
>>   .keyBy(m => (m.1, m.2, m.3.getOrElse(-1), m.4))
>>
>> then apply a window function
>>
>> val appliedWindow = keyedStream
>>   .timeWindow(minutes(WindowTimeMinutes))
>>   .allowedLateness(minutes(WindowDelayMinutes))
>>   .apply(new windowFunc1)
>>
>>
>> This is my first action in apply function
>>
>> override def apply(key: (Int,Long,Int,Int), window: TimeWindow, input: Iterable[T4], out: Collector[T4]): Unit = {
>>   myClass.addKey(key,window)
>>
>>
>> And in this class i have a function
>>
>> object myClass{
>>   val keyHash = new mutable.HashMap[(Int,Long,Int,Int),TimeWindow]()
>>
>>
>>   def addKey(key: (Int,Long,Int,Int), window : TimeWindow) : Unit = {
>>     if(keyHash.contains(key)){
>>       printf("Multiple key found for: " + key + "\n")
>>       printf(keyHash(key) + "\n")
>>       printf(window + "\n")
>>     }
>>     keyHash.put(key,window)
>>   }
>> }
>>
>>
>> This outputs
>> Multiple key found for: (1,2,3,4)
>> TimeWindow{start=1518268800000, end=1518270000000}
>> TimeWindow{start=1518268800000, end=1518270000000}
>>
>>
>> So it seems that the keyby is not doing what is is supposed to or I am
>> messing things up somewhere but I cannot seem to find it.
>>
>> Regards
>> Björn
>>
>>
>>
>

Re: Keyby multiple elements

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Björn,

You configured an allowed lateness, so this might be caused by late
arriving data.
In case a late record is received within the allowed lateness, the window
function will fire again for the same window.
Does that also happen if you remove the allowedLateness() call?

Best, Fabian


2018-02-16 12:05 GMT+01:00 Björn Zachrisson <bj...@gmail.com>:

> Hi,
>
> I have something weird probably a user error :)
>
> I'm running a keyby on multiple elements
>
> val keyedStream = nonKeyedStream
>   .keyBy(m => (m.1, m.2, m.3.getOrElse(-1), m.4))
>
> then apply a window function
>
> val appliedWindow = keyedStream
>   .timeWindow(minutes(WindowTimeMinutes))
>   .allowedLateness(minutes(WindowDelayMinutes))
>   .apply(new windowFunc1)
>
>
> This is my first action in apply function
>
> override def apply(key: (Int,Long,Int,Int), window: TimeWindow, input: Iterable[T4], out: Collector[T4]): Unit = {
>   myClass.addKey(key,window)
>
>
> And in this class i have a function
>
> object myClass{
>   val keyHash = new mutable.HashMap[(Int,Long,Int,Int),TimeWindow]()
>
>
>   def addKey(key: (Int,Long,Int,Int), window : TimeWindow) : Unit = {
>     if(keyHash.contains(key)){
>       printf("Multiple key found for: " + key + "\n")
>       printf(keyHash(key) + "\n")
>       printf(window + "\n")
>     }
>     keyHash.put(key,window)
>   }
> }
>
>
> This outputs
> Multiple key found for: (1,2,3,4)
> TimeWindow{start=1518268800000, end=1518270000000}
> TimeWindow{start=1518268800000, end=1518270000000}
>
>
> So it seems that the keyby is not doing what is is supposed to or I am
> messing things up somewhere but I cannot seem to find it.
>
> Regards
> Björn
>
>
>