You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gabriel Pelielo | Stone <gp...@stone.com.br> on 2018/08/27 15:29:20 UTC

Difficulty managing keyed streams

Hello everyone.


I'm currently developing a flink program to aggregate information about my company's clients' credit card transactions. Each transaction has a clientId and a transactionDate related to it. What I want to do is make a Sliding week time window with size 21 days sliding every 1 hour, comparing the hourly number of transactions of a client to their last week's number of transactions in the same hour, for example:


  *   Monday, August 13th 10:00 to 11:00 => 42 transactions.
  *   Monday, August 20th 10:00 to 11:00 => 20 transactions.
  *   Monday, August 27th 10:00 to 11:00 => 29 transactions.


My problem is that if a client does not make a single transaction for a whole hour, I don't get a 0 appended to my resulting list of transaction. What I need as the final result is a list containing 3 elements, each representing the count of transactions, the start and the end of the window, for example:


TransactionProfile(ClientId1, 2018-02-26T14:00, 2018-03-19T14:00, 15, 0, 10).


I have a keyed stream with the key being the clientId and my ideia to solve this problem was to append a 0 to the list of the clientIds that did not make any transactions on that hour whenever another clientId has finished its window, but I don't know how to achieve this. Any help would be appreciated.


I'm coding in Scala with Flink 1.4 and a piece of my code is the following:

val profileStream: DataStream[Profile] = streamFromKafka
  .map(openTransaction => Transaction(openTransaction.clientId, openTransaction.transactionDate))
  .keyBy(trx => trx.clientId)
  .window(SlidingEventWeekTimeWindows.of(Time.days(21), Time.hours(1)))
  .aggregate(new CountAggregate(), new TransactionProcessWindowFunction())


Best Regards,



[http://blog.stone.com.br/wp-content/uploads/2017/08/stone-assinatura.png]




Gabriel Pelielo
Fraud Detection

+ 55 21 98603 9725
gpelielo@stone.com.br

stone.com.br<http://www.stone.com.br/>





O conteúdo desta mensagem é confidencial e destinado exclusivamente aos destinatários. Caso a receba por engano, favor destruí-la e notificar o remetente de imediato. O correio eletrônico não configura meio seguro para transmissão de dados e o remetente NÃO se responsabiliza por eventual erro, atraso, extravio, interceptação ou infecção por vírus. Cabe ao destinatário solicitar versão física sempre que necessário.

The content of this message is confidential and was intended solely to its recipient. In case this message is received by mistake, please destroy it and notify the sender immediately. Electronic mails are not a safe channel for data transmission and the sender accepts NO liability for eventual errors, delays, loss, interception or virus infection. When necessary, the receiver must request a hard-copy version.

Re: Difficulty managing keyed streams

Posted by vino yang <ya...@gmail.com>.
Hi Gabriel,

In your scenario, I guess you should be based on Event time.
In this case, I think you can implement self-triggering by customizing the
trigger of the window, and then combine ProcessWindowFunction[1] to define
your calculation logic.
Because most of your time is based on Watermark, and very few scenes
require timing triggers.

A similar example is here.[2]

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#processwindowfunction
[2]:
https://stackoverflow.com/questions/47059762/apache-flink-send-event-if-no-data-was-received-for-x-minutes

Thanks, vino.


Gabriel Pelielo | Stone <gp...@stone.com.br> 于2018年8月27日周一 下午11:29写道:

> Hello everyone.
>
>
> I'm currently developing a flink program to aggregate information about my
> company's clients' credit card transactions. Each transaction has a
> clientId and a transactionDate related to it. What I want to do is make a
> Sliding week time window with size 21 days sliding every 1 hour, comparing
> the hourly number of transactions of a client to their last week's number
> of transactions in the same hour, for example:
>
>
>
>    - Monday, August 13th 10:00 to 11:00 => 42 transactions.
>    - Monday, August 20th 10:00 to 11:00 => 20 transactions.
>    - Monday, August 27th 10:00 to 11:00 => 29 transactions.
>
>
> My problem is that if a client does not make a single transaction for a
> whole hour, I don't get a 0 appended to my resulting list of transaction.
> What I need as the final result is a list containing 3 elements, each
> representing the count of transactions, the start and the end of the
> window, for example:
>
>
> *TransactionProfile(ClientId1, 2018-02-26T14:00, 2018-03-19T14:00, 15, 0,
> 10).*
>
>
> I have a keyed stream with the key being the clientId and my ideia to
> solve this problem was to append a 0 to the list of the clientIds that did
> not make any transactions on that hour whenever another clientId has
> finished its window, but I don't know how to achieve this. Any help would
> be appreciated.
>
>
> I'm coding in Scala with Flink 1.4 and a piece of my code is the following:
>
> val profileStream: DataStream[Profile] = streamFromKafka
>   .map(openTransaction => Transaction(openTransaction.clientId, openTransaction.transactionDate))
>   .keyBy(trx => trx.clientId)
>   .window(SlidingEventWeekTimeWindows.of(Time.days(21), Time.hours(1)))
>   .aggregate(new CountAggregate(), new TransactionProcessWindowFunction())
>
>
> Best Regards,
>
>
>
>
>
>
> * Gabriel Pelielo*
> Fraud Detection
>
> + 55 21 98603 9725
> gpelielo@stone.com.br
> stone.com.br <http://www.stone.com.br/>
>
>
> *O conteúdo desta mensagem é confidencial e destinado exclusivamente aos
> destinatários. Caso a receba por engano, favor destruí-la e notificar o
> remetente de imediato. O correio eletrônico não configura meio seguro para
> transmissão de dados e o remetente NÃO se responsabiliza por eventual erro,
> atraso, extravio, interceptação ou infecção por vírus. Cabe ao destinatário
> solicitar versão física sempre que necessário.*
>
> * The content of this message is confidential and was intended solely to
> its recipient. **In case this message is received by mistake, please
> destroy it and notify the sender immediately. Electronic mails are not a
> safe channel for data transmission and the sender accepts NO liability for
> eventual errors, delays, loss, interception or virus infection. When
> necessary, the receiver must request a hard-copy version.*
>
>