You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Adam Bellemare <ad...@gmail.com> on 2019/07/03 01:28:06 UTC

Timestamped-based synchronized reads from multiple input topics?

Hi All

The use-case is pretty simple. Lets say we have a history of events with
the following:
key=userId, value = (timestamp, productId)

and we want to remap it to:
key=productId, value=(original_timestamp, userId)

Now, say I have 30 days of backlog, and 2 input topics with the original
events. I spin up 2 instances and let them process the data from the
beginning of time, but one instance is only half as powerful (less CPU,
Mem, etc), such that instance 0 processes X events / sec which instance 1
processes x/2 events /sec.

My question is: How do I allow these two consumer instances to remain in
sync *according to their timestamps* (not offsets) as they consume from
these topics? I don't want to see events with original_timestamps out of
order by more than, say, 60 seconds.  I am looking for any existing tech
that would effectively say *"cap the time difference of events coming out
of each processor at 60 seconds max".* If one processor is too far ahead of
the other, stop and wait for it to catch up.

To me it seems like the shared consumer may *implicitly* offer a close
option, but I am concerned that one input topic would be consumed and
processed much too far ahead of the other input topic, such that the
arbitrary 60s window is not observed. I have been digging through the code
and from my understanding of it it seems that there is no guarantee in
offset consumption between topics, and that it's on a per subscription
basis.

Anyways, if anyone has any information they could lend to helping me solve
this sort of qualitative issue I would be very grateful. I'm coming from
the Kafka world so I apologize if my terminology isn't quite on point.

Thanks
Adam

Re: Timestamped-based synchronized reads from multiple input topics?

Posted by David Kjerrumgaard <da...@streaml.io>.
Adam,

That is my understanding as well, that the context is associated with a
namespace, so as long as you run both functions in the same namespace you
shouldn't have any issues.

HTH,
David K.

On Wed, Jul 3, 2019 at 10:51 AM Adam Bellemare <ad...@gmail.com>
wrote:

> Hi David
>
> Thanks for the fast reply. Just one follow up question.
>
> The context object provides access to the state state store in BookKeeper -
> is that context shared globally across all functions, or is it limited in
> scope to a given namespace unique to our two functions in question (ie:
> consumer group in Kafka)? From what I've read it looks like they're
> separated by namespace, which would be ideal, but just wanted to confirm.
>
> Thanks again, and also for the pseudo code as well.
>
> Adam
>
>
> On Wed, Jul 3, 2019 at 12:10 PM David Kjerrumgaard <da...@streaml.io>
> wrote:
>
> > Hi Adam,
> >
> > Your best bet for this might be to implement 2 separate Stateful Pulsar
> > functions, each consuming from a different topic. The basic idea is to
> use
> > the stateful context in order to share information about which timestamp
> > they are processing and use that information to decide whether or not to
> > process the next record.Something like the following pseudo-code....
> >
> > public class SyncFunctionA implements Function<String, String> {
> >     @Override
> >     public void apply(String input, Context ctx) {
> >
> >     // Topic A: key=userId, value = (timestamp, productId)
> >     long msg_ts = input.getTimestamp();   // Assume you know how to parse
> > the value
> >     long topicB_ts = Long.parse(ctx.getState("topicB_ts_key"));
> >     long max_delta = ctx.getUserConfigValue("max_delta");
> >
> >     while (Math.abs(msg_ts - topicB_ts) > max_delta) {
> >     Thread.sleep(100);  // wait until we are "in sync"
> >     }
> >
> >     // Process Topic A message
> >
> >     ctx.putState("topicA_ts_key", msg_ts);
> >
> >     }
> > }
> >
> > Regards,
> > David
> >
> > On Tue, Jul 2, 2019 at 6:28 PM Adam Bellemare <ad...@gmail.com>
> > wrote:
> >
> > > Hi All
> > >
> > > The use-case is pretty simple. Lets say we have a history of events
> with
> > > the following:
> > > key=userId, value = (timestamp, productId)
> > >
> > > and we want to remap it to:
> > > key=productId, value=(original_timestamp, userId)
> > >
> > > Now, say I have 30 days of backlog, and 2 input topics with the
> original
> > > events. I spin up 2 instances and let them process the data from the
> > > beginning of time, but one instance is only half as powerful (less CPU,
> > > Mem, etc), such that instance 0 processes X events / sec which
> instance 1
> > > processes x/2 events /sec.
> > >
> > > My question is: How do I allow these two consumer instances to remain
> in
> > > sync *according to their timestamps* (not offsets) as they consume from
> > > these topics? I don't want to see events with original_timestamps out
> of
> > > order by more than, say, 60 seconds.  I am looking for any existing
> tech
> > > that would effectively say *"cap the time difference of events coming
> out
> > > of each processor at 60 seconds max".* If one processor is too far
> ahead
> > of
> > > the other, stop and wait for it to catch up.
> > >
> > > To me it seems like the shared consumer may *implicitly* offer a close
> > > option, but I am concerned that one input topic would be consumed and
> > > processed much too far ahead of the other input topic, such that the
> > > arbitrary 60s window is not observed. I have been digging through the
> > code
> > > and from my understanding of it it seems that there is no guarantee in
> > > offset consumption between topics, and that it's on a per subscription
> > > basis.
> > >
> > > Anyways, if anyone has any information they could lend to helping me
> > solve
> > > this sort of qualitative issue I would be very grateful. I'm coming
> from
> > > the Kafka world so I apologize if my terminology isn't quite on point.
> > >
> > > Thanks
> > > Adam
> > >
> >
> >
> > --
> > Regards,
> >
> > David Kjerrumgaard
> > Director - Solution Architecture
> > Streamlio
> > Cell: 330-437-5467
> >
>


-- 
Regards,

David Kjerrumgaard
Director - Solution Architecture
Streamlio
Cell: 330-437-5467

Re: Timestamped-based synchronized reads from multiple input topics?

Posted by Adam Bellemare <ad...@gmail.com>.
Hi David

Thanks for the fast reply. Just one follow up question.

The context object provides access to the state state store in BookKeeper -
is that context shared globally across all functions, or is it limited in
scope to a given namespace unique to our two functions in question (ie:
consumer group in Kafka)? From what I've read it looks like they're
separated by namespace, which would be ideal, but just wanted to confirm.

Thanks again, and also for the pseudo code as well.

Adam


On Wed, Jul 3, 2019 at 12:10 PM David Kjerrumgaard <da...@streaml.io> wrote:

> Hi Adam,
>
> Your best bet for this might be to implement 2 separate Stateful Pulsar
> functions, each consuming from a different topic. The basic idea is to use
> the stateful context in order to share information about which timestamp
> they are processing and use that information to decide whether or not to
> process the next record.Something like the following pseudo-code....
>
> public class SyncFunctionA implements Function<String, String> {
>     @Override
>     public void apply(String input, Context ctx) {
>
>     // Topic A: key=userId, value = (timestamp, productId)
>     long msg_ts = input.getTimestamp();   // Assume you know how to parse
> the value
>     long topicB_ts = Long.parse(ctx.getState("topicB_ts_key"));
>     long max_delta = ctx.getUserConfigValue("max_delta");
>
>     while (Math.abs(msg_ts - topicB_ts) > max_delta) {
>     Thread.sleep(100);  // wait until we are "in sync"
>     }
>
>     // Process Topic A message
>
>     ctx.putState("topicA_ts_key", msg_ts);
>
>     }
> }
>
> Regards,
> David
>
> On Tue, Jul 2, 2019 at 6:28 PM Adam Bellemare <ad...@gmail.com>
> wrote:
>
> > Hi All
> >
> > The use-case is pretty simple. Lets say we have a history of events with
> > the following:
> > key=userId, value = (timestamp, productId)
> >
> > and we want to remap it to:
> > key=productId, value=(original_timestamp, userId)
> >
> > Now, say I have 30 days of backlog, and 2 input topics with the original
> > events. I spin up 2 instances and let them process the data from the
> > beginning of time, but one instance is only half as powerful (less CPU,
> > Mem, etc), such that instance 0 processes X events / sec which instance 1
> > processes x/2 events /sec.
> >
> > My question is: How do I allow these two consumer instances to remain in
> > sync *according to their timestamps* (not offsets) as they consume from
> > these topics? I don't want to see events with original_timestamps out of
> > order by more than, say, 60 seconds.  I am looking for any existing tech
> > that would effectively say *"cap the time difference of events coming out
> > of each processor at 60 seconds max".* If one processor is too far ahead
> of
> > the other, stop and wait for it to catch up.
> >
> > To me it seems like the shared consumer may *implicitly* offer a close
> > option, but I am concerned that one input topic would be consumed and
> > processed much too far ahead of the other input topic, such that the
> > arbitrary 60s window is not observed. I have been digging through the
> code
> > and from my understanding of it it seems that there is no guarantee in
> > offset consumption between topics, and that it's on a per subscription
> > basis.
> >
> > Anyways, if anyone has any information they could lend to helping me
> solve
> > this sort of qualitative issue I would be very grateful. I'm coming from
> > the Kafka world so I apologize if my terminology isn't quite on point.
> >
> > Thanks
> > Adam
> >
>
>
> --
> Regards,
>
> David Kjerrumgaard
> Director - Solution Architecture
> Streamlio
> Cell: 330-437-5467
>

Re: Timestamped-based synchronized reads from multiple input topics?

Posted by David Kjerrumgaard <da...@streaml.io>.
Hi Adam,

Your best bet for this might be to implement 2 separate Stateful Pulsar
functions, each consuming from a different topic. The basic idea is to use
the stateful context in order to share information about which timestamp
they are processing and use that information to decide whether or not to
process the next record.Something like the following pseudo-code....

public class SyncFunctionA implements Function<String, String> {
    @Override
    public void apply(String input, Context ctx) {

    // Topic A: key=userId, value = (timestamp, productId)
    long msg_ts = input.getTimestamp();   // Assume you know how to parse
the value
    long topicB_ts = Long.parse(ctx.getState("topicB_ts_key"));
    long max_delta = ctx.getUserConfigValue("max_delta");

    while (Math.abs(msg_ts - topicB_ts) > max_delta) {
    Thread.sleep(100);  // wait until we are "in sync"
    }

    // Process Topic A message

    ctx.putState("topicA_ts_key", msg_ts);

    }
}

Regards,
David

On Tue, Jul 2, 2019 at 6:28 PM Adam Bellemare <ad...@gmail.com>
wrote:

> Hi All
>
> The use-case is pretty simple. Lets say we have a history of events with
> the following:
> key=userId, value = (timestamp, productId)
>
> and we want to remap it to:
> key=productId, value=(original_timestamp, userId)
>
> Now, say I have 30 days of backlog, and 2 input topics with the original
> events. I spin up 2 instances and let them process the data from the
> beginning of time, but one instance is only half as powerful (less CPU,
> Mem, etc), such that instance 0 processes X events / sec which instance 1
> processes x/2 events /sec.
>
> My question is: How do I allow these two consumer instances to remain in
> sync *according to their timestamps* (not offsets) as they consume from
> these topics? I don't want to see events with original_timestamps out of
> order by more than, say, 60 seconds.  I am looking for any existing tech
> that would effectively say *"cap the time difference of events coming out
> of each processor at 60 seconds max".* If one processor is too far ahead of
> the other, stop and wait for it to catch up.
>
> To me it seems like the shared consumer may *implicitly* offer a close
> option, but I am concerned that one input topic would be consumed and
> processed much too far ahead of the other input topic, such that the
> arbitrary 60s window is not observed. I have been digging through the code
> and from my understanding of it it seems that there is no guarantee in
> offset consumption between topics, and that it's on a per subscription
> basis.
>
> Anyways, if anyone has any information they could lend to helping me solve
> this sort of qualitative issue I would be very grateful. I'm coming from
> the Kafka world so I apologize if my terminology isn't quite on point.
>
> Thanks
> Adam
>


-- 
Regards,

David Kjerrumgaard
Director - Solution Architecture
Streamlio
Cell: 330-437-5467