You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by "郭亚峰(默岭)" <ya...@alibaba-inc.com> on 2017/04/02 07:48:17 UTC

how to calculate state based metrics in beam?

Hi,hope you all are  doing well.
My case:Say in a bank they have several windows and each will provide diffrent kind of services to its customers.  People are lineup in queues before the window. they can switch from this queue to another when they realized they need a diffrent service, or given up in case queue is too long. But mostly pepole get needed service finally. What I need to calculate:  Queue length before each window in real time. (reasonable latency is quite ok)
Data Source: it is a home grown system but quite similiar to Kafka. when I subscribe data from the topic, I get data format like thistimestamp (in mills), session_id (indicate a whole service interaction lifecycle from very beginning to the end.),  state_id (0: in queue, 1: in service: 2: done), queue_id.
My current solution:I used a stateful DoFn. Supposing all data I received are sticked in event time order, and I recorded state_id and queue_id in states. Compare state_id and queue_id get in this round with previous round read from states, I can tell if the state_id was changed or queue_id was changed. Then I will issues a record with delta queue size to down stream. if the queue was changed, I will issue a record for old queue as kind of "retraction" as well.This worked very well if the pipeline paralisim is limited to 1. but sonner it didn't work when I increased the paralisim, as the data stateful DoFn received became out of order. To solve this issues, I added a DoFn which leveraged Timer to control the order of data it emitted, but I don't know how timer callback can interact with data received before.
I wonder if anyone could tell me how a timer callback can interact with data a DoFn received before? Or is there any better/workable solution for this case?
thanks a lot.Ya-Feng