You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alessandro Tagliapietra <ta...@gmail.com> on 2020/05/12 03:01:40 UTC

Merging multiple streams into one

Hello everyone,

we currently use 3 streams (metrics, events, states) and I need to
implement a keepalive mechanism so that if the machine doesn't send any
data (from a specific list of variables) it'll emit a value that changes
the machine state.

For example, in machine 1 the list of keepalive variables are "foo" and
"bar", to propagate the list of variables I use a configuration topic that
uses a GlobalKTable so that each stream application can read the machine's
configuration.

Then my idea was to "merge" all the three streams into one so that I can
use a ValueTransformer to:
 - read the configuration store and ignore messages that don't belong to
the configured variables for a machine
 - uses a local state store to save the "last seen" of a machine
 - use the punctuator to emit a "change" in the machine status if the "last
seen" of a machine is older than some time

To "merge' the 3 streams I was thinking to just map them into a single
intermediate topic and have the ValueTransformer read from that.

Is there a better way? Maybe without using an intermediate topic?

Thank you in advance

--
Alessandro Tagliapietra

Re: Merging multiple streams into one

Posted by Alessandro Tagliapietra <ta...@gmail.com>.
Hi  Bill,

thank you for replying.

Yes keys are all the same type (machine ID string)

Btw, your solution sounds great, but it'll only work if al the 3 streams
have the same number of partitions, right?
Otherwise there's no guarantee that all the data of the same machine (the
topic keys are the machine IDs) ends up in the same streams instance.
Which is instead guaranteed with the intermediate topic?

Thanks!

--
Alessandro Tagliapietra


On Tue, May 12, 2020 at 7:16 AM Bill Bejeck <bi...@confluent.io> wrote:

> Hi Alessandro,
>
> For merging the three streams, have you considered the `KStream.merge`
> method?
> If the values are of different types, you'll need to map them into a common
> type first, but I think
> something like this will work:
>
> KStream mappedOne = orignalStreamOne.mapValues(...);
> KStream mappedTwo = originalStreamTwo.mapValues(...):
> KStream mappedThree = originalStreamThree.mapValues(...);
>
> KStream mergedStream = mappedOne.merge(mappedTwo).merge(mappedThree);
>
> Just keep in mind there are no ordering guarantees for the records of the
> merged streams.  Also, I made the assumption the keys are
> of the same type, if not, then you'll have to change the `mapValues` call
> to a `map`.
>
> HTH,
> Bill
>
> On Mon, May 11, 2020 at 11:02 PM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
> > Hello everyone,
> >
> > we currently use 3 streams (metrics, events, states) and I need to
> > implement a keepalive mechanism so that if the machine doesn't send any
> > data (from a specific list of variables) it'll emit a value that changes
> > the machine state.
> >
> > For example, in machine 1 the list of keepalive variables are "foo" and
> > "bar", to propagate the list of variables I use a configuration topic
> that
> > uses a GlobalKTable so that each stream application can read the
> machine's
> > configuration.
> >
> > Then my idea was to "merge" all the three streams into one so that I can
> > use a ValueTransformer to:
> >  - read the configuration store and ignore messages that don't belong to
> > the configured variables for a machine
> >  - uses a local state store to save the "last seen" of a machine
> >  - use the punctuator to emit a "change" in the machine status if the
> "last
> > seen" of a machine is older than some time
> >
> > To "merge' the 3 streams I was thinking to just map them into a single
> > intermediate topic and have the ValueTransformer read from that.
> >
> > Is there a better way? Maybe without using an intermediate topic?
> >
> > Thank you in advance
> >
> > --
> > Alessandro Tagliapietra
> >
>

Re: Merging multiple streams into one

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Alessandro,

For merging the three streams, have you considered the `KStream.merge`
method?
If the values are of different types, you'll need to map them into a common
type first, but I think
something like this will work:

KStream mappedOne = orignalStreamOne.mapValues(...);
KStream mappedTwo = originalStreamTwo.mapValues(...):
KStream mappedThree = originalStreamThree.mapValues(...);

KStream mergedStream = mappedOne.merge(mappedTwo).merge(mappedThree);

Just keep in mind there are no ordering guarantees for the records of the
merged streams.  Also, I made the assumption the keys are
of the same type, if not, then you'll have to change the `mapValues` call
to a `map`.

HTH,
Bill

On Mon, May 11, 2020 at 11:02 PM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> Hello everyone,
>
> we currently use 3 streams (metrics, events, states) and I need to
> implement a keepalive mechanism so that if the machine doesn't send any
> data (from a specific list of variables) it'll emit a value that changes
> the machine state.
>
> For example, in machine 1 the list of keepalive variables are "foo" and
> "bar", to propagate the list of variables I use a configuration topic that
> uses a GlobalKTable so that each stream application can read the machine's
> configuration.
>
> Then my idea was to "merge" all the three streams into one so that I can
> use a ValueTransformer to:
>  - read the configuration store and ignore messages that don't belong to
> the configured variables for a machine
>  - uses a local state store to save the "last seen" of a machine
>  - use the punctuator to emit a "change" in the machine status if the "last
> seen" of a machine is older than some time
>
> To "merge' the 3 streams I was thinking to just map them into a single
> intermediate topic and have the ValueTransformer read from that.
>
> Is there a better way? Maybe without using an intermediate topic?
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra
>