You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Artur Mrozowski <ar...@gmail.com> on 2018/05/01 17:40:25 UTC

Re: Workaround for KTable/KTable join followed by groupBy and aggregate/count can result in duplicated results KAFKA-4609?

Hi John,
yes, the answer is very helpful and your understanding of the data flow is
correct. Although, deduplication is not the issue because there will not be
any duplicates inserted into the flow.
These, the duplicates will be generated, from unique records after the join
between claim and payments and converting the result to stream.
But perhaps that stream is entirely avoidable?

So it would look something like this:

KTable<String,ArrayList> left
{"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
"3_0", "claimtime": 708.521153490306}

and KTable <String,ArrayList> right

{"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
"payment": 847015.1437781961}

When I leftjoin theses two objects the result in the state store will be an
object containing two  ArrayLists left and right, like this

{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
claimreporttime":"55948.33110985625","claimcounter":"
0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}

But I want to continue processing the results by using groupBy and
aggregate so I convert reuslt of the leftjoin to stream. Now the resulting
repartion and changelog topics will contain two identical messages, like
this

{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
claimreporttime":"55948.33110985625","claimcounter":"
0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
claimreporttime":"55948.33110985625","claimcounter":"
0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}

Best regards
Artur


On Mon, Apr 30, 2018 at 5:30 PM, John Roesler <jo...@confluent.io> wrote:

> Hello Artur,
>
> Apologies in advance if I say something incorrect, as I'm still a little
> new to this project.
>
> If I followed your example, then I think the scenario is that you're
> joining "claims" and "payments", grouping by "claimNumber", and then
> building a list for each "claimNumber" of all the claim/payment pairs. Is
> that right?
>
> It's not in your example, but the grouped stream or table for "claims"
> (claimStrGrouped) and "payments" (paymentGrouped) must be keyed with the
> same key, right? In that case, the result of their join will also be keyed
> by that same key.
>
> It seems like the problem you're seeing is that that list contains the same
> claim/payment pair multiple times for a given claimNumber. Did I get it?
>
> In that case, I don't know if what you're seeing is the same issue Damian
> reported in KAFKA-4609, since the problem he reported was that there was no
> deduping cache after the join, only before it, unless you register a state
> store representing the join itself. In your case, it looks like you do
> register a state store representing the join, the
> "CLAIM_AND_PAYMENT_JOIN_STORE".
> So you will have a cache that can dedup the join result.
>
> Note that the join itself is what causes duplicates, not the subsequent "
> claimAndPaymentKTable.toStream()". For example, if I see input like this:
>
> (left stream):
> t1: k1 -> L1
> t3: k1 -> L1
>
> (right stream):
> t2: k1 -> R1
>
> Then, without deduplication, the resulting join would be:
> (left.join(right) stream):
> t1: k1 -> (L1, null)
> t2: k1 -> (L1, R1)
> t3: k1 -> (L1, R1)
>
> Note that we see apparently duplicate join results, but really the meaning
> of the join stream is that "as of right now, this is the value for this
> key", so from the join's perspective it's not wrong to say "as of t2, k1's
> value is (L1, R1)" and then to say it at t3 again.
>
> In Kafka Streams, there is a deduplication cache which can reduce such
> duplicate events, but without unbounded memory, the cache can't guarantee
> to remove all duplicates, so it's important to deal with the join result in
> a semantically robust way.
>
> I think this also contains the key to resolving your issue; inside your
> aggregator, instead of storing a list of *every event*, I think you'll want
> to store a map of the *latest event by key*. (This would be the key that's
> common to claimStrGrouped, paymentGrouped, and claimAndPaymentKTable). This
> way, you'll automatically overwrite old, obsolete, join results with new
> ones for the same key (whether or not the old result happens to be the same
> as the new one).
>
> Does this help?
> -John
>
> On Mon, Apr 30, 2018 at 1:19 AM, Artur Mrozowski <ar...@gmail.com> wrote:
>
> > Hi,
> > a while ago I hit KAFKA-4609 when running a simple pipeline. I have two
> > KTable joins followed by group by and aggregate on and KStream  and one
> > additional join. Now this KTable/KTable join followed by group by  and
> > aggregated genereates duplicates.
> >
> >
> >
> > I wonder if a possible workaround would be to remove the KStream after
> > KTable/KTable join and make groupBy and aggregate  on the KTable?
> >
> >
> >  KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable =
> > customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new
> > CustomerAndPolicy(customer,policy));
> >
> >        KTable<String,ClaimAndPayment> claimAndPaymentKTable =
> > claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new
> > ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_
> > AND_PAYMENT_JOIN_STORE);
> >
> >
> >  *     KStream<String,ClaimAndPayment> claimAndPaymentKStream =
> > claimAndPaymentKTable.toStream(); //Can we remove this and avoid
> > KAFKA-4609?*
> >
> >        KTable<Integer,ClaimAndPayment2> claimAndPayment2IntGroupedTable
> =
> > claimAndPaymentKStream
> >                .groupBy((k,claimPay) ->
> >                    (claimPay.claimList != null ) ?
> >
> > Integer.parseInt(claimPay.claimList.claimRecords.get(0).
> > claimnumber.split("_")[0])
> > :  999,integerSerde,claimAndPaymentSerde )
> >                .aggregate(
> >                         ClaimAndPayment2::new,
> >                         (claimKey,claimPay,claimAndPay2) -> {
> >
> >
> > claimAndPay2.claimAndPaymentList.add(claimPay);
> >
> >                                 return claimAndPay2;
> >
> >                         }
> >                         ,claimAndPayment2Serde
> >                         ,CLAIM_AND_PAYMENT_STORE
> >                 );
> >
> >
> >
> >
> >
> > Best regards
> > Artur Mrozowski
> >
>

Re: Workaround for KTable/KTable join followed by groupBy and aggregate/count can result in duplicated results KAFKA-4609?

Posted by Artur Mrozowski <ar...@gmail.com>.
Hi John,
thanks. Yes, using hashes is what I already do.
Thank you for excellent explanation.

Best Regards
Artur

On Tue, May 1, 2018 at 9:29 PM, John Roesler <jo...@confluent.io> wrote:

> Hi Artur,
>
> Thanks for the clarification.
>
> I don't think that ".toStream()" actually does anything besides change the
> context from KTable to KStream in the DSL. The Javadoc says:
>
> * Note that this is a logical operation and only changes the
> > "interpretation" of the stream, i.e., each record of
> > * this changelog stream is no longer treated as an updated record (cf.
> > {@link KStream} vs {@code KTable}).
>
>
> Not to belabor the point, but I wouldn't want you to focus too much on
> getting rid of the "toStream" and in favor of the same methods on KTable,
> as I think that would have the exact same semantics.
>
> It's entirely possible that some additional tuning on the join could reduce
> the deplicates you're seeing. For example, what are your current settings
> for commit interval and dedup cache size?
>
> In any case, though, Kafka Streams's deduplication mechanism is only
> best-effort. So if your correctness depends on unique events (as yours
> does), I still think you're better off coding in anticipation of
> duplicates. For example, you could implement hashCode and equals on
> ClaimAndPayment and store them in a LinkedHashSet (to preserve both
> uniqueness and order).
>
> Hope that helps,
> -John
>
>
> On Tue, May 1, 2018 at 12:40 PM, Artur Mrozowski <ar...@gmail.com> wrote:
>
> > Hi John,
> > yes, the answer is very helpful and your understanding of the data flow
> is
> > correct. Although, deduplication is not the issue because there will not
> be
> > any duplicates inserted into the flow.
> > These, the duplicates will be generated, from unique records after the
> join
> > between claim and payments and converting the result to stream.
> > But perhaps that stream is entirely avoidable?
> >
> > So it would look something like this:
> >
> > KTable<String,ArrayList> left
> > {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
> > "3_0", "claimtime": 708.521153490306}
> >
> > and KTable <String,ArrayList> right
> >
> > {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
> > "payment": 847015.1437781961}
> >
> > When I leftjoin theses two objects the result in the state store will be
> an
> > object containing two  ArrayLists left and right, like this
> >
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","
> > claimreporttime":"55948.33110985625","claimcounter":"
> > 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> > paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >
> > But I want to continue processing the results by using groupBy and
> > aggregate so I convert reuslt of the leftjoin to stream. Now the
> resulting
> > repartion and changelog topics will contain two identical messages, like
> > this
> >
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","
> > claimreporttime":"55948.33110985625","claimcounter":"
> > 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> > paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","
> > claimreporttime":"55948.33110985625","claimcounter":"
> > 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> > paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >
> > Best regards
> > Artur
> >
> >
> > On Mon, Apr 30, 2018 at 5:30 PM, John Roesler <jo...@confluent.io> wrote:
> >
> > > Hello Artur,
> > >
> > > Apologies in advance if I say something incorrect, as I'm still a
> little
> > > new to this project.
> > >
> > > If I followed your example, then I think the scenario is that you're
> > > joining "claims" and "payments", grouping by "claimNumber", and then
> > > building a list for each "claimNumber" of all the claim/payment pairs.
> Is
> > > that right?
> > >
> > > It's not in your example, but the grouped stream or table for "claims"
> > > (claimStrGrouped) and "payments" (paymentGrouped) must be keyed with
> the
> > > same key, right? In that case, the result of their join will also be
> > keyed
> > > by that same key.
> > >
> > > It seems like the problem you're seeing is that that list contains the
> > same
> > > claim/payment pair multiple times for a given claimNumber. Did I get
> it?
> > >
> > > In that case, I don't know if what you're seeing is the same issue
> Damian
> > > reported in KAFKA-4609, since the problem he reported was that there
> was
> > no
> > > deduping cache after the join, only before it, unless you register a
> > state
> > > store representing the join itself. In your case, it looks like you do
> > > register a state store representing the join, the
> > > "CLAIM_AND_PAYMENT_JOIN_STORE".
> > > So you will have a cache that can dedup the join result.
> > >
> > > Note that the join itself is what causes duplicates, not the
> subsequent "
> > > claimAndPaymentKTable.toStream()". For example, if I see input like
> > this:
> > >
> > > (left stream):
> > > t1: k1 -> L1
> > > t3: k1 -> L1
> > >
> > > (right stream):
> > > t2: k1 -> R1
> > >
> > > Then, without deduplication, the resulting join would be:
> > > (left.join(right) stream):
> > > t1: k1 -> (L1, null)
> > > t2: k1 -> (L1, R1)
> > > t3: k1 -> (L1, R1)
> > >
> > > Note that we see apparently duplicate join results, but really the
> > meaning
> > > of the join stream is that "as of right now, this is the value for this
> > > key", so from the join's perspective it's not wrong to say "as of t2,
> > k1's
> > > value is (L1, R1)" and then to say it at t3 again.
> > >
> > > In Kafka Streams, there is a deduplication cache which can reduce such
> > > duplicate events, but without unbounded memory, the cache can't
> guarantee
> > > to remove all duplicates, so it's important to deal with the join
> result
> > in
> > > a semantically robust way.
> > >
> > > I think this also contains the key to resolving your issue; inside your
> > > aggregator, instead of storing a list of *every event*, I think you'll
> > want
> > > to store a map of the *latest event by key*. (This would be the key
> > that's
> > > common to claimStrGrouped, paymentGrouped, and claimAndPaymentKTable).
> > This
> > > way, you'll automatically overwrite old, obsolete, join results with
> new
> > > ones for the same key (whether or not the old result happens to be the
> > same
> > > as the new one).
> > >
> > > Does this help?
> > > -John
> > >
> > > On Mon, Apr 30, 2018 at 1:19 AM, Artur Mrozowski <ar...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > > a while ago I hit KAFKA-4609 when running a simple pipeline. I have
> two
> > > > KTable joins followed by group by and aggregate on and KStream  and
> one
> > > > additional join. Now this KTable/KTable join followed by group by
> and
> > > > aggregated genereates duplicates.
> > > >
> > > >
> > > >
> > > > I wonder if a possible workaround would be to remove the KStream
> after
> > > > KTable/KTable join and make groupBy and aggregate  on the KTable?
> > > >
> > > >
> > > >  KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable =
> > > > customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new
> > > > CustomerAndPolicy(customer,policy));
> > > >
> > > >        KTable<String,ClaimAndPayment> claimAndPaymentKTable =
> > > > claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new
> > > > ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_
> > > > AND_PAYMENT_JOIN_STORE);
> > > >
> > > >
> > > >  *     KStream<String,ClaimAndPayment> claimAndPaymentKStream =
> > > > claimAndPaymentKTable.toStream(); //Can we remove this and avoid
> > > > KAFKA-4609?*
> > > >
> > > >        KTable<Integer,ClaimAndPayment2>
> claimAndPayment2IntGroupedTabl
> > e
> > > =
> > > > claimAndPaymentKStream
> > > >                .groupBy((k,claimPay) ->
> > > >                    (claimPay.claimList != null ) ?
> > > >
> > > > Integer.parseInt(claimPay.claimList.claimRecords.get(0).
> > > > claimnumber.split("_")[0])
> > > > :  999,integerSerde,claimAndPaymentSerde )
> > > >                .aggregate(
> > > >                         ClaimAndPayment2::new,
> > > >                         (claimKey,claimPay,claimAndPay2) -> {
> > > >
> > > >
> > > > claimAndPay2.claimAndPaymentList.add(claimPay);
> > > >
> > > >                                 return claimAndPay2;
> > > >
> > > >                         }
> > > >                         ,claimAndPayment2Serde
> > > >                         ,CLAIM_AND_PAYMENT_STORE
> > > >                 );
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Best regards
> > > > Artur Mrozowski
> > > >
> > >
> >
>

Re: Workaround for KTable/KTable join followed by groupBy and aggregate/count can result in duplicated results KAFKA-4609?

Posted by John Roesler <jo...@confluent.io>.
Hi Artur,

Thanks for the clarification.

I don't think that ".toStream()" actually does anything besides change the
context from KTable to KStream in the DSL. The Javadoc says:

* Note that this is a logical operation and only changes the
> "interpretation" of the stream, i.e., each record of
> * this changelog stream is no longer treated as an updated record (cf.
> {@link KStream} vs {@code KTable}).


Not to belabor the point, but I wouldn't want you to focus too much on
getting rid of the "toStream" and in favor of the same methods on KTable,
as I think that would have the exact same semantics.

It's entirely possible that some additional tuning on the join could reduce
the deplicates you're seeing. For example, what are your current settings
for commit interval and dedup cache size?

In any case, though, Kafka Streams's deduplication mechanism is only
best-effort. So if your correctness depends on unique events (as yours
does), I still think you're better off coding in anticipation of
duplicates. For example, you could implement hashCode and equals on
ClaimAndPayment and store them in a LinkedHashSet (to preserve both
uniqueness and order).

Hope that helps,
-John


On Tue, May 1, 2018 at 12:40 PM, Artur Mrozowski <ar...@gmail.com> wrote:

> Hi John,
> yes, the answer is very helpful and your understanding of the data flow is
> correct. Although, deduplication is not the issue because there will not be
> any duplicates inserted into the flow.
> These, the duplicates will be generated, from unique records after the join
> between claim and payments and converting the result to stream.
> But perhaps that stream is entirely avoidable?
>
> So it would look something like this:
>
> KTable<String,ArrayList> left
> {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
> "3_0", "claimtime": 708.521153490306}
>
> and KTable <String,ArrayList> right
>
> {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
> "payment": 847015.1437781961}
>
> When I leftjoin theses two objects the result in the state store will be an
> object containing two  ArrayLists left and right, like this
>
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
> claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>
> But I want to continue processing the results by using groupBy and
> aggregate so I convert reuslt of the leftjoin to stream. Now the resulting
> repartion and changelog topics will contain two identical messages, like
> this
>
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
> claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
> claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>
> Best regards
> Artur
>
>
> On Mon, Apr 30, 2018 at 5:30 PM, John Roesler <jo...@confluent.io> wrote:
>
> > Hello Artur,
> >
> > Apologies in advance if I say something incorrect, as I'm still a little
> > new to this project.
> >
> > If I followed your example, then I think the scenario is that you're
> > joining "claims" and "payments", grouping by "claimNumber", and then
> > building a list for each "claimNumber" of all the claim/payment pairs. Is
> > that right?
> >
> > It's not in your example, but the grouped stream or table for "claims"
> > (claimStrGrouped) and "payments" (paymentGrouped) must be keyed with the
> > same key, right? In that case, the result of their join will also be
> keyed
> > by that same key.
> >
> > It seems like the problem you're seeing is that that list contains the
> same
> > claim/payment pair multiple times for a given claimNumber. Did I get it?
> >
> > In that case, I don't know if what you're seeing is the same issue Damian
> > reported in KAFKA-4609, since the problem he reported was that there was
> no
> > deduping cache after the join, only before it, unless you register a
> state
> > store representing the join itself. In your case, it looks like you do
> > register a state store representing the join, the
> > "CLAIM_AND_PAYMENT_JOIN_STORE".
> > So you will have a cache that can dedup the join result.
> >
> > Note that the join itself is what causes duplicates, not the subsequent "
> > claimAndPaymentKTable.toStream()". For example, if I see input like
> this:
> >
> > (left stream):
> > t1: k1 -> L1
> > t3: k1 -> L1
> >
> > (right stream):
> > t2: k1 -> R1
> >
> > Then, without deduplication, the resulting join would be:
> > (left.join(right) stream):
> > t1: k1 -> (L1, null)
> > t2: k1 -> (L1, R1)
> > t3: k1 -> (L1, R1)
> >
> > Note that we see apparently duplicate join results, but really the
> meaning
> > of the join stream is that "as of right now, this is the value for this
> > key", so from the join's perspective it's not wrong to say "as of t2,
> k1's
> > value is (L1, R1)" and then to say it at t3 again.
> >
> > In Kafka Streams, there is a deduplication cache which can reduce such
> > duplicate events, but without unbounded memory, the cache can't guarantee
> > to remove all duplicates, so it's important to deal with the join result
> in
> > a semantically robust way.
> >
> > I think this also contains the key to resolving your issue; inside your
> > aggregator, instead of storing a list of *every event*, I think you'll
> want
> > to store a map of the *latest event by key*. (This would be the key
> that's
> > common to claimStrGrouped, paymentGrouped, and claimAndPaymentKTable).
> This
> > way, you'll automatically overwrite old, obsolete, join results with new
> > ones for the same key (whether or not the old result happens to be the
> same
> > as the new one).
> >
> > Does this help?
> > -John
> >
> > On Mon, Apr 30, 2018 at 1:19 AM, Artur Mrozowski <ar...@gmail.com>
> wrote:
> >
> > > Hi,
> > > a while ago I hit KAFKA-4609 when running a simple pipeline. I have two
> > > KTable joins followed by group by and aggregate on and KStream  and one
> > > additional join. Now this KTable/KTable join followed by group by  and
> > > aggregated genereates duplicates.
> > >
> > >
> > >
> > > I wonder if a possible workaround would be to remove the KStream after
> > > KTable/KTable join and make groupBy and aggregate  on the KTable?
> > >
> > >
> > >  KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable =
> > > customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new
> > > CustomerAndPolicy(customer,policy));
> > >
> > >        KTable<String,ClaimAndPayment> claimAndPaymentKTable =
> > > claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new
> > > ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_
> > > AND_PAYMENT_JOIN_STORE);
> > >
> > >
> > >  *     KStream<String,ClaimAndPayment> claimAndPaymentKStream =
> > > claimAndPaymentKTable.toStream(); //Can we remove this and avoid
> > > KAFKA-4609?*
> > >
> > >        KTable<Integer,ClaimAndPayment2> claimAndPayment2IntGroupedTabl
> e
> > =
> > > claimAndPaymentKStream
> > >                .groupBy((k,claimPay) ->
> > >                    (claimPay.claimList != null ) ?
> > >
> > > Integer.parseInt(claimPay.claimList.claimRecords.get(0).
> > > claimnumber.split("_")[0])
> > > :  999,integerSerde,claimAndPaymentSerde )
> > >                .aggregate(
> > >                         ClaimAndPayment2::new,
> > >                         (claimKey,claimPay,claimAndPay2) -> {
> > >
> > >
> > > claimAndPay2.claimAndPaymentList.add(claimPay);
> > >
> > >                                 return claimAndPay2;
> > >
> > >                         }
> > >                         ,claimAndPayment2Serde
> > >                         ,CLAIM_AND_PAYMENT_STORE
> > >                 );
> > >
> > >
> > >
> > >
> > >
> > > Best regards
> > > Artur Mrozowski
> > >
> >
>