You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mathieu Fenniak <ma...@replicon.com> on 2016/07/20 14:09:55 UTC

Kafka Streams: KTable join + filter + foreach

Hello Kafka users,

I'm seeing some unexpected results when using Kafka Streams, and I was
hoping someone could explain them to me.  I have two streams, which I've
converted KStream->KTable, and then I am joining them together with a
"join" (not an outer join, not a full join).  With the resulting KTable
from the join, I am performing a foreach.

When I startup my Streams application, my foreach receives two records with
valid keys but null values *before* my ValueJoiner ever gets executed.  Why
would that be?

Code excerpt; please excuse the use of Kotlin here:

val builder = KStreamBuilder()

val approvalStatus = builder.table(
        Serdes.String(),
        JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
        "TimesheetApprovalStatusChanged"
)

val timesheetLastApprovalAction = builder.table(
        Serdes.String(),
        JsonSerde(Map::class.java),
        "TimesheetApprovalActionPerformed"
)

val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
approvalStatus, lastApprovalAction ->
    println("EXECUTING ValueJoiner")
    computeTimesheetStatus(approvalStatus.approvalStatus!!, lastApprovalAction
as Map<String, Any?>)
})

timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
    println("EXECUTING ForeachAction: $timesheetKey, status:
$timesheetStatus")
    if (timesheetStatus == null) {
        println("SKIPPING NULL I DON'T UNDERSTAND")
    }
})


Resulting console output:

EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
status: null
SKIPPING NULL I DON'T UNDERSTAND
EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
status: null
SKIPPING NULL I DON'T UNDERSTAND
EXECUTING ValueJoiner
EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
status: urn:replicon:timesheet-status:submitting


Any explanation on why the foreach would be executing for data that hasn't
been generated by my join?

Thanks,

Mathieu

Re: Kafka Streams: KTable join + filter + foreach

Posted by Guozhang Wang <wa...@gmail.com>.
I think this is a bug of 0.10.0 that we have already fixed in trunk some
time ago.

Guozhang

On Wed, Jul 20, 2016 at 12:25 PM, Mathieu Fenniak <
mathieu.fenniak@replicon.com> wrote:

> I'm using the 0.10.0.0 release.
>
> Matthias's suggestion of using .toStream().filter(...).foreach(...) does
> prevents the nulls from reaching the foreach.  But
> .filter(...).foreach(...) does not; the filter's predicate is not even
> executed before the ForeachAction receives the null records.
>
>
>
> On Wed, Jul 20, 2016 at 12:30 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Try to do a
> >
> > .toStream().filter(...).foreach(...)
> >
> >
> > -Matthias
> >
> >
> > On 07/20/2016 08:11 PM, Guozhang Wang wrote:
> > > Are you using the 0.10.0.0 release or from trunk?
> > >
> > > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
> > > mathieu.fenniak@replicon.com> wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> Yes, I tried to apply the filter on the KTable that came from join,
> and
> > >> then the foreach on the KTable that came from filter.  I was still
> > getting
> > >> the nulls through to my foreach.
> > >>
> > >> It is easy to workaround, but, the behaviour was especially surprising
> > when
> > >> the filter didn't prevent it.
> > >>
> > >> Mathieu
> > >>
> > >>
> > >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wa...@gmail.com>
> > >> wrote:
> > >>
> > >>> Hi Mathieu,
> > >>>
> > >>> As Matthias said, we are working on improving the current join
> > semantics:
> > >>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
> > >>>
> > >>> and will keep you updated.
> > >>>
> > >>>
> > >>> As for KTable.filter(), I think it can actually achieve want you
> want:
> > >> not
> > >>> forwarding nulls to the downstream operators; have you tried it out
> but
> > >>> find it is not working?
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> > >>> mathieu.fenniak@replicon.com> wrote:
> > >>>
> > >>>> Hm... OK, I think that makes sense.
> > >>>>
> > >>>> It seems like I can't filter out those tombstone records; is that
> > >>> expected
> > >>>> as well?  If I throw in a .filter operation before my foreach, its
> > >>>> Predicate is not invoked, and the foreach's ForeachAction is invoked
> > >>> with a
> > >>>> null value still.
> > >>>>
> > >>>> Mathieu
> > >>>>
> > >>>>
> > >>>> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
> > >> matthias@confluent.io>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Mathieu,
> > >>>>>
> > >>>>> join semantics are tricky. We are still working on a better
> > >>>>> documentation for it...
> > >>>>>
> > >>>>> For the current state and your question:
> > >>>>>
> > >>>>> Each time a record is processed, it looks up the other KTable to
> see
> > >> if
> > >>>>> there is a matching record. If non is found, the join result is
> empty
> > >>>>> and a tombstone record with <key:null> is sent downstream. This
> > >>> happens,
> > >>>>> to delete any (possible existing) previous join result for this key
> > >> --
> > >>>>> keep in mind, that the result is a KTable containing the current
> > >> state
> > >>>>> of the join.
> > >>>>>
> > >>>>> This happens both ways, thus, if your first records of each stream
> do
> > >>>>> not match on the key, both result in a <key:null> message to delete
> > >>>>> possible existing join-tuples in the result KTable.
> > >>>>>
> > >>>>> Does this make sense to you?
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > >>>>>> Hello Kafka users,
> > >>>>>>
> > >>>>>> I'm seeing some unexpected results when using Kafka Streams, and I
> > >>> was
> > >>>>>> hoping someone could explain them to me.  I have two streams,
> which
> > >>>> I've
> > >>>>>> converted KStream->KTable, and then I am joining them together
> > >> with a
> > >>>>>> "join" (not an outer join, not a full join).  With the resulting
> > >>> KTable
> > >>>>>> from the join, I am performing a foreach.
> > >>>>>>
> > >>>>>> When I startup my Streams application, my foreach receives two
> > >>> records
> > >>>>> with
> > >>>>>> valid keys but null values *before* my ValueJoiner ever gets
> > >>> executed.
> > >>>>> Why
> > >>>>>> would that be?
> > >>>>>>
> > >>>>>> Code excerpt; please excuse the use of Kotlin here:
> > >>>>>>
> > >>>>>> val builder = KStreamBuilder()
> > >>>>>>
> > >>>>>> val approvalStatus = builder.table(
> > >>>>>>         Serdes.String(),
> > >>>>>>
> > >>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > >>>>>>         "TimesheetApprovalStatusChanged"
> > >>>>>> )
> > >>>>>>
> > >>>>>> val timesheetLastApprovalAction = builder.table(
> > >>>>>>         Serdes.String(),
> > >>>>>>         JsonSerde(Map::class.java),
> > >>>>>>         "TimesheetApprovalActionPerformed"
> > >>>>>> )
> > >>>>>>
> > >>>>>> val timesheetStatus =
> > >>> approvalStatus.join(timesheetLastApprovalAction,
> > >>>> {
> > >>>>>> approvalStatus, lastApprovalAction ->
> > >>>>>>     println("EXECUTING ValueJoiner")
> > >>>>>>     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > >>>>> lastApprovalAction
> > >>>>>> as Map<String, Any?>)
> > >>>>>> })
> > >>>>>>
> > >>>>>> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > >>>>>>     println("EXECUTING ForeachAction: $timesheetKey, status:
> > >>>>>> $timesheetStatus")
> > >>>>>>     if (timesheetStatus == null) {
> > >>>>>>         println("SKIPPING NULL I DON'T UNDERSTAND")
> > >>>>>>     }
> > >>>>>> })
> > >>>>>>
> > >>>>>>
> > >>>>>> Resulting console output:
> > >>>>>>
> > >>>>>> EXECUTING ForeachAction:
> > >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > >>>>>> status: null
> > >>>>>> SKIPPING NULL I DON'T UNDERSTAND
> > >>>>>> EXECUTING ForeachAction:
> > >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > >>>>>> status: null
> > >>>>>> SKIPPING NULL I DON'T UNDERSTAND
> > >>>>>> EXECUTING ValueJoiner
> > >>>>>> EXECUTING ForeachAction:
> > >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > >>>>>> status: urn:replicon:timesheet-status:submitting
> > >>>>>>
> > >>>>>>
> > >>>>>> Any explanation on why the foreach would be executing for data
> that
> > >>>>> hasn't
> > >>>>>> been generated by my join?
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>>
> > >>>>>> Mathieu
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> > >
> > >
> > >
> >
> >
>



-- 
-- Guozhang

Re: Kafka Streams: KTable join + filter + foreach

Posted by Mathieu Fenniak <ma...@replicon.com>.
I'm using the 0.10.0.0 release.

Matthias's suggestion of using .toStream().filter(...).foreach(...) does
prevents the nulls from reaching the foreach.  But
.filter(...).foreach(...) does not; the filter's predicate is not even
executed before the ForeachAction receives the null records.



On Wed, Jul 20, 2016 at 12:30 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Try to do a
>
> .toStream().filter(...).foreach(...)
>
>
> -Matthias
>
>
> On 07/20/2016 08:11 PM, Guozhang Wang wrote:
> > Are you using the 0.10.0.0 release or from trunk?
> >
> > On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
> > mathieu.fenniak@replicon.com> wrote:
> >
> >> Hi Guozhang,
> >>
> >> Yes, I tried to apply the filter on the KTable that came from join, and
> >> then the foreach on the KTable that came from filter.  I was still
> getting
> >> the nulls through to my foreach.
> >>
> >> It is easy to workaround, but, the behaviour was especially surprising
> when
> >> the filter didn't prevent it.
> >>
> >> Mathieu
> >>
> >>
> >> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>
> >>> Hi Mathieu,
> >>>
> >>> As Matthias said, we are working on improving the current join
> semantics:
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
> >>>
> >>> and will keep you updated.
> >>>
> >>>
> >>> As for KTable.filter(), I think it can actually achieve want you want:
> >> not
> >>> forwarding nulls to the downstream operators; have you tried it out but
> >>> find it is not working?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> >>> mathieu.fenniak@replicon.com> wrote:
> >>>
> >>>> Hm... OK, I think that makes sense.
> >>>>
> >>>> It seems like I can't filter out those tombstone records; is that
> >>> expected
> >>>> as well?  If I throw in a .filter operation before my foreach, its
> >>>> Predicate is not invoked, and the foreach's ForeachAction is invoked
> >>> with a
> >>>> null value still.
> >>>>
> >>>> Mathieu
> >>>>
> >>>>
> >>>> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Hi Mathieu,
> >>>>>
> >>>>> join semantics are tricky. We are still working on a better
> >>>>> documentation for it...
> >>>>>
> >>>>> For the current state and your question:
> >>>>>
> >>>>> Each time a record is processed, it looks up the other KTable to see
> >> if
> >>>>> there is a matching record. If non is found, the join result is empty
> >>>>> and a tombstone record with <key:null> is sent downstream. This
> >>> happens,
> >>>>> to delete any (possible existing) previous join result for this key
> >> --
> >>>>> keep in mind, that the result is a KTable containing the current
> >> state
> >>>>> of the join.
> >>>>>
> >>>>> This happens both ways, thus, if your first records of each stream do
> >>>>> not match on the key, both result in a <key:null> message to delete
> >>>>> possible existing join-tuples in the result KTable.
> >>>>>
> >>>>> Does this make sense to you?
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> >>>>>> Hello Kafka users,
> >>>>>>
> >>>>>> I'm seeing some unexpected results when using Kafka Streams, and I
> >>> was
> >>>>>> hoping someone could explain them to me.  I have two streams, which
> >>>> I've
> >>>>>> converted KStream->KTable, and then I am joining them together
> >> with a
> >>>>>> "join" (not an outer join, not a full join).  With the resulting
> >>> KTable
> >>>>>> from the join, I am performing a foreach.
> >>>>>>
> >>>>>> When I startup my Streams application, my foreach receives two
> >>> records
> >>>>> with
> >>>>>> valid keys but null values *before* my ValueJoiner ever gets
> >>> executed.
> >>>>> Why
> >>>>>> would that be?
> >>>>>>
> >>>>>> Code excerpt; please excuse the use of Kotlin here:
> >>>>>>
> >>>>>> val builder = KStreamBuilder()
> >>>>>>
> >>>>>> val approvalStatus = builder.table(
> >>>>>>         Serdes.String(),
> >>>>>>
> >>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> >>>>>>         "TimesheetApprovalStatusChanged"
> >>>>>> )
> >>>>>>
> >>>>>> val timesheetLastApprovalAction = builder.table(
> >>>>>>         Serdes.String(),
> >>>>>>         JsonSerde(Map::class.java),
> >>>>>>         "TimesheetApprovalActionPerformed"
> >>>>>> )
> >>>>>>
> >>>>>> val timesheetStatus =
> >>> approvalStatus.join(timesheetLastApprovalAction,
> >>>> {
> >>>>>> approvalStatus, lastApprovalAction ->
> >>>>>>     println("EXECUTING ValueJoiner")
> >>>>>>     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> >>>>> lastApprovalAction
> >>>>>> as Map<String, Any?>)
> >>>>>> })
> >>>>>>
> >>>>>> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> >>>>>>     println("EXECUTING ForeachAction: $timesheetKey, status:
> >>>>>> $timesheetStatus")
> >>>>>>     if (timesheetStatus == null) {
> >>>>>>         println("SKIPPING NULL I DON'T UNDERSTAND")
> >>>>>>     }
> >>>>>> })
> >>>>>>
> >>>>>>
> >>>>>> Resulting console output:
> >>>>>>
> >>>>>> EXECUTING ForeachAction:
> >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> >>>>>> status: null
> >>>>>> SKIPPING NULL I DON'T UNDERSTAND
> >>>>>> EXECUTING ForeachAction:
> >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> >>>>>> status: null
> >>>>>> SKIPPING NULL I DON'T UNDERSTAND
> >>>>>> EXECUTING ValueJoiner
> >>>>>> EXECUTING ForeachAction:
> >>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> >>>>>> status: urn:replicon:timesheet-status:submitting
> >>>>>>
> >>>>>>
> >>>>>> Any explanation on why the foreach would be executing for data that
> >>>>> hasn't
> >>>>>> been generated by my join?
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Mathieu
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
> >
> >
>
>

Re: Kafka Streams: KTable join + filter + foreach

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Try to do a

.toStream().filter(...).foreach(...)


-Matthias


On 07/20/2016 08:11 PM, Guozhang Wang wrote:
> Are you using the 0.10.0.0 release or from trunk?
> 
> On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
> mathieu.fenniak@replicon.com> wrote:
> 
>> Hi Guozhang,
>>
>> Yes, I tried to apply the filter on the KTable that came from join, and
>> then the foreach on the KTable that came from filter.  I was still getting
>> the nulls through to my foreach.
>>
>> It is easy to workaround, but, the behaviour was especially surprising when
>> the filter didn't prevent it.
>>
>> Mathieu
>>
>>
>> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>>
>>> Hi Mathieu,
>>>
>>> As Matthias said, we are working on improving the current join semantics:
>>>
>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
>>>
>>> and will keep you updated.
>>>
>>>
>>> As for KTable.filter(), I think it can actually achieve want you want:
>> not
>>> forwarding nulls to the downstream operators; have you tried it out but
>>> find it is not working?
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
>>> mathieu.fenniak@replicon.com> wrote:
>>>
>>>> Hm... OK, I think that makes sense.
>>>>
>>>> It seems like I can't filter out those tombstone records; is that
>>> expected
>>>> as well?  If I throw in a .filter operation before my foreach, its
>>>> Predicate is not invoked, and the foreach's ForeachAction is invoked
>>> with a
>>>> null value still.
>>>>
>>>> Mathieu
>>>>
>>>>
>>>> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
>> matthias@confluent.io>
>>>> wrote:
>>>>
>>>>> Hi Mathieu,
>>>>>
>>>>> join semantics are tricky. We are still working on a better
>>>>> documentation for it...
>>>>>
>>>>> For the current state and your question:
>>>>>
>>>>> Each time a record is processed, it looks up the other KTable to see
>> if
>>>>> there is a matching record. If non is found, the join result is empty
>>>>> and a tombstone record with <key:null> is sent downstream. This
>>> happens,
>>>>> to delete any (possible existing) previous join result for this key
>> --
>>>>> keep in mind, that the result is a KTable containing the current
>> state
>>>>> of the join.
>>>>>
>>>>> This happens both ways, thus, if your first records of each stream do
>>>>> not match on the key, both result in a <key:null> message to delete
>>>>> possible existing join-tuples in the result KTable.
>>>>>
>>>>> Does this make sense to you?
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
>>>>>> Hello Kafka users,
>>>>>>
>>>>>> I'm seeing some unexpected results when using Kafka Streams, and I
>>> was
>>>>>> hoping someone could explain them to me.  I have two streams, which
>>>> I've
>>>>>> converted KStream->KTable, and then I am joining them together
>> with a
>>>>>> "join" (not an outer join, not a full join).  With the resulting
>>> KTable
>>>>>> from the join, I am performing a foreach.
>>>>>>
>>>>>> When I startup my Streams application, my foreach receives two
>>> records
>>>>> with
>>>>>> valid keys but null values *before* my ValueJoiner ever gets
>>> executed.
>>>>> Why
>>>>>> would that be?
>>>>>>
>>>>>> Code excerpt; please excuse the use of Kotlin here:
>>>>>>
>>>>>> val builder = KStreamBuilder()
>>>>>>
>>>>>> val approvalStatus = builder.table(
>>>>>>         Serdes.String(),
>>>>>>
>>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
>>>>>>         "TimesheetApprovalStatusChanged"
>>>>>> )
>>>>>>
>>>>>> val timesheetLastApprovalAction = builder.table(
>>>>>>         Serdes.String(),
>>>>>>         JsonSerde(Map::class.java),
>>>>>>         "TimesheetApprovalActionPerformed"
>>>>>> )
>>>>>>
>>>>>> val timesheetStatus =
>>> approvalStatus.join(timesheetLastApprovalAction,
>>>> {
>>>>>> approvalStatus, lastApprovalAction ->
>>>>>>     println("EXECUTING ValueJoiner")
>>>>>>     computeTimesheetStatus(approvalStatus.approvalStatus!!,
>>>>> lastApprovalAction
>>>>>> as Map<String, Any?>)
>>>>>> })
>>>>>>
>>>>>> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
>>>>>>     println("EXECUTING ForeachAction: $timesheetKey, status:
>>>>>> $timesheetStatus")
>>>>>>     if (timesheetStatus == null) {
>>>>>>         println("SKIPPING NULL I DON'T UNDERSTAND")
>>>>>>     }
>>>>>> })
>>>>>>
>>>>>>
>>>>>> Resulting console output:
>>>>>>
>>>>>> EXECUTING ForeachAction:
>>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
>>>>>> status: null
>>>>>> SKIPPING NULL I DON'T UNDERSTAND
>>>>>> EXECUTING ForeachAction:
>>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
>>>>>> status: null
>>>>>> SKIPPING NULL I DON'T UNDERSTAND
>>>>>> EXECUTING ValueJoiner
>>>>>> EXECUTING ForeachAction:
>>> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
>>>>>> status: urn:replicon:timesheet-status:submitting
>>>>>>
>>>>>>
>>>>>> Any explanation on why the foreach would be executing for data that
>>>>> hasn't
>>>>>> been generated by my join?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Mathieu
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 
> 
> 


Re: Kafka Streams: KTable join + filter + foreach

Posted by Guozhang Wang <wa...@gmail.com>.
Are you using the 0.10.0.0 release or from trunk?

On Wed, Jul 20, 2016 at 10:58 AM, Mathieu Fenniak <
mathieu.fenniak@replicon.com> wrote:

> Hi Guozhang,
>
> Yes, I tried to apply the filter on the KTable that came from join, and
> then the foreach on the KTable that came from filter.  I was still getting
> the nulls through to my foreach.
>
> It is easy to workaround, but, the behaviour was especially surprising when
> the filter didn't prevent it.
>
> Mathieu
>
>
> On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Hi Mathieu,
> >
> > As Matthias said, we are working on improving the current join semantics:
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
> >
> > and will keep you updated.
> >
> >
> > As for KTable.filter(), I think it can actually achieve want you want:
> not
> > forwarding nulls to the downstream operators; have you tried it out but
> > find it is not working?
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> > mathieu.fenniak@replicon.com> wrote:
> >
> > > Hm... OK, I think that makes sense.
> > >
> > > It seems like I can't filter out those tombstone records; is that
> > expected
> > > as well?  If I throw in a .filter operation before my foreach, its
> > > Predicate is not invoked, and the foreach's ForeachAction is invoked
> > with a
> > > null value still.
> > >
> > > Mathieu
> > >
> > >
> > > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > > > Hi Mathieu,
> > > >
> > > > join semantics are tricky. We are still working on a better
> > > > documentation for it...
> > > >
> > > > For the current state and your question:
> > > >
> > > > Each time a record is processed, it looks up the other KTable to see
> if
> > > > there is a matching record. If non is found, the join result is empty
> > > > and a tombstone record with <key:null> is sent downstream. This
> > happens,
> > > > to delete any (possible existing) previous join result for this key
> --
> > > > keep in mind, that the result is a KTable containing the current
> state
> > > > of the join.
> > > >
> > > > This happens both ways, thus, if your first records of each stream do
> > > > not match on the key, both result in a <key:null> message to delete
> > > > possible existing join-tuples in the result KTable.
> > > >
> > > > Does this make sense to you?
> > > >
> > > > -Matthias
> > > >
> > > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > > > Hello Kafka users,
> > > > >
> > > > > I'm seeing some unexpected results when using Kafka Streams, and I
> > was
> > > > > hoping someone could explain them to me.  I have two streams, which
> > > I've
> > > > > converted KStream->KTable, and then I am joining them together
> with a
> > > > > "join" (not an outer join, not a full join).  With the resulting
> > KTable
> > > > > from the join, I am performing a foreach.
> > > > >
> > > > > When I startup my Streams application, my foreach receives two
> > records
> > > > with
> > > > > valid keys but null values *before* my ValueJoiner ever gets
> > executed.
> > > > Why
> > > > > would that be?
> > > > >
> > > > > Code excerpt; please excuse the use of Kotlin here:
> > > > >
> > > > > val builder = KStreamBuilder()
> > > > >
> > > > > val approvalStatus = builder.table(
> > > > >         Serdes.String(),
> > > > >
>  JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > > > >         "TimesheetApprovalStatusChanged"
> > > > > )
> > > > >
> > > > > val timesheetLastApprovalAction = builder.table(
> > > > >         Serdes.String(),
> > > > >         JsonSerde(Map::class.java),
> > > > >         "TimesheetApprovalActionPerformed"
> > > > > )
> > > > >
> > > > > val timesheetStatus =
> > approvalStatus.join(timesheetLastApprovalAction,
> > > {
> > > > > approvalStatus, lastApprovalAction ->
> > > > >     println("EXECUTING ValueJoiner")
> > > > >     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > > > lastApprovalAction
> > > > > as Map<String, Any?>)
> > > > > })
> > > > >
> > > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > > > >     println("EXECUTING ForeachAction: $timesheetKey, status:
> > > > > $timesheetStatus")
> > > > >     if (timesheetStatus == null) {
> > > > >         println("SKIPPING NULL I DON'T UNDERSTAND")
> > > > >     }
> > > > > })
> > > > >
> > > > >
> > > > > Resulting console output:
> > > > >
> > > > > EXECUTING ForeachAction:
> > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > > status: null
> > > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > > EXECUTING ForeachAction:
> > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > > status: null
> > > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > > EXECUTING ValueJoiner
> > > > > EXECUTING ForeachAction:
> > mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > > status: urn:replicon:timesheet-status:submitting
> > > > >
> > > > >
> > > > > Any explanation on why the foreach would be executing for data that
> > > > hasn't
> > > > > been generated by my join?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mathieu
> > > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka Streams: KTable join + filter + foreach

Posted by Mathieu Fenniak <ma...@replicon.com>.
Hi Guozhang,

Yes, I tried to apply the filter on the KTable that came from join, and
then the foreach on the KTable that came from filter.  I was still getting
the nulls through to my foreach.

It is easy to workaround, but, the behaviour was especially surprising when
the filter didn't prevent it.

Mathieu


On Wed, Jul 20, 2016 at 11:57 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Mathieu,
>
> As Matthias said, we are working on improving the current join semantics:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287
>
> and will keep you updated.
>
>
> As for KTable.filter(), I think it can actually achieve want you want: not
> forwarding nulls to the downstream operators; have you tried it out but
> find it is not working?
>
>
> Guozhang
>
>
>
> On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
> mathieu.fenniak@replicon.com> wrote:
>
> > Hm... OK, I think that makes sense.
> >
> > It seems like I can't filter out those tombstone records; is that
> expected
> > as well?  If I throw in a .filter operation before my foreach, its
> > Predicate is not invoked, and the foreach's ForeachAction is invoked
> with a
> > null value still.
> >
> > Mathieu
> >
> >
> > On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Hi Mathieu,
> > >
> > > join semantics are tricky. We are still working on a better
> > > documentation for it...
> > >
> > > For the current state and your question:
> > >
> > > Each time a record is processed, it looks up the other KTable to see if
> > > there is a matching record. If non is found, the join result is empty
> > > and a tombstone record with <key:null> is sent downstream. This
> happens,
> > > to delete any (possible existing) previous join result for this key --
> > > keep in mind, that the result is a KTable containing the current state
> > > of the join.
> > >
> > > This happens both ways, thus, if your first records of each stream do
> > > not match on the key, both result in a <key:null> message to delete
> > > possible existing join-tuples in the result KTable.
> > >
> > > Does this make sense to you?
> > >
> > > -Matthias
> > >
> > > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > > Hello Kafka users,
> > > >
> > > > I'm seeing some unexpected results when using Kafka Streams, and I
> was
> > > > hoping someone could explain them to me.  I have two streams, which
> > I've
> > > > converted KStream->KTable, and then I am joining them together with a
> > > > "join" (not an outer join, not a full join).  With the resulting
> KTable
> > > > from the join, I am performing a foreach.
> > > >
> > > > When I startup my Streams application, my foreach receives two
> records
> > > with
> > > > valid keys but null values *before* my ValueJoiner ever gets
> executed.
> > > Why
> > > > would that be?
> > > >
> > > > Code excerpt; please excuse the use of Kotlin here:
> > > >
> > > > val builder = KStreamBuilder()
> > > >
> > > > val approvalStatus = builder.table(
> > > >         Serdes.String(),
> > > >         JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > > >         "TimesheetApprovalStatusChanged"
> > > > )
> > > >
> > > > val timesheetLastApprovalAction = builder.table(
> > > >         Serdes.String(),
> > > >         JsonSerde(Map::class.java),
> > > >         "TimesheetApprovalActionPerformed"
> > > > )
> > > >
> > > > val timesheetStatus =
> approvalStatus.join(timesheetLastApprovalAction,
> > {
> > > > approvalStatus, lastApprovalAction ->
> > > >     println("EXECUTING ValueJoiner")
> > > >     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > > lastApprovalAction
> > > > as Map<String, Any?>)
> > > > })
> > > >
> > > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > > >     println("EXECUTING ForeachAction: $timesheetKey, status:
> > > > $timesheetStatus")
> > > >     if (timesheetStatus == null) {
> > > >         println("SKIPPING NULL I DON'T UNDERSTAND")
> > > >     }
> > > > })
> > > >
> > > >
> > > > Resulting console output:
> > > >
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: null
> > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: null
> > > > SKIPPING NULL I DON'T UNDERSTAND
> > > > EXECUTING ValueJoiner
> > > > EXECUTING ForeachAction:
> mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > > status: urn:replicon:timesheet-status:submitting
> > > >
> > > >
> > > > Any explanation on why the foreach would be executing for data that
> > > hasn't
> > > > been generated by my join?
> > > >
> > > > Thanks,
> > > >
> > > > Mathieu
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams: KTable join + filter + foreach

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Mathieu,

As Matthias said, we are working on improving the current join semantics:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=63407287

and will keep you updated.


As for KTable.filter(), I think it can actually achieve want you want: not
forwarding nulls to the downstream operators; have you tried it out but
find it is not working?


Guozhang



On Wed, Jul 20, 2016 at 7:42 AM, Mathieu Fenniak <
mathieu.fenniak@replicon.com> wrote:

> Hm... OK, I think that makes sense.
>
> It seems like I can't filter out those tombstone records; is that expected
> as well?  If I throw in a .filter operation before my foreach, its
> Predicate is not invoked, and the foreach's ForeachAction is invoked with a
> null value still.
>
> Mathieu
>
>
> On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Hi Mathieu,
> >
> > join semantics are tricky. We are still working on a better
> > documentation for it...
> >
> > For the current state and your question:
> >
> > Each time a record is processed, it looks up the other KTable to see if
> > there is a matching record. If non is found, the join result is empty
> > and a tombstone record with <key:null> is sent downstream. This happens,
> > to delete any (possible existing) previous join result for this key --
> > keep in mind, that the result is a KTable containing the current state
> > of the join.
> >
> > This happens both ways, thus, if your first records of each stream do
> > not match on the key, both result in a <key:null> message to delete
> > possible existing join-tuples in the result KTable.
> >
> > Does this make sense to you?
> >
> > -Matthias
> >
> > On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > > Hello Kafka users,
> > >
> > > I'm seeing some unexpected results when using Kafka Streams, and I was
> > > hoping someone could explain them to me.  I have two streams, which
> I've
> > > converted KStream->KTable, and then I am joining them together with a
> > > "join" (not an outer join, not a full join).  With the resulting KTable
> > > from the join, I am performing a foreach.
> > >
> > > When I startup my Streams application, my foreach receives two records
> > with
> > > valid keys but null values *before* my ValueJoiner ever gets executed.
> > Why
> > > would that be?
> > >
> > > Code excerpt; please excuse the use of Kotlin here:
> > >
> > > val builder = KStreamBuilder()
> > >
> > > val approvalStatus = builder.table(
> > >         Serdes.String(),
> > >         JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> > >         "TimesheetApprovalStatusChanged"
> > > )
> > >
> > > val timesheetLastApprovalAction = builder.table(
> > >         Serdes.String(),
> > >         JsonSerde(Map::class.java),
> > >         "TimesheetApprovalActionPerformed"
> > > )
> > >
> > > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction,
> {
> > > approvalStatus, lastApprovalAction ->
> > >     println("EXECUTING ValueJoiner")
> > >     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> > lastApprovalAction
> > > as Map<String, Any?>)
> > > })
> > >
> > > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> > >     println("EXECUTING ForeachAction: $timesheetKey, status:
> > > $timesheetStatus")
> > >     if (timesheetStatus == null) {
> > >         println("SKIPPING NULL I DON'T UNDERSTAND")
> > >     }
> > > })
> > >
> > >
> > > Resulting console output:
> > >
> > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > status: null
> > > SKIPPING NULL I DON'T UNDERSTAND
> > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > status: null
> > > SKIPPING NULL I DON'T UNDERSTAND
> > > EXECUTING ValueJoiner
> > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > > status: urn:replicon:timesheet-status:submitting
> > >
> > >
> > > Any explanation on why the foreach would be executing for data that
> > hasn't
> > > been generated by my join?
> > >
> > > Thanks,
> > >
> > > Mathieu
> > >
> >
> >
>



-- 
-- Guozhang

Re: Kafka Streams: KTable join + filter + foreach

Posted by Mathieu Fenniak <ma...@replicon.com>.
Hm... OK, I think that makes sense.

It seems like I can't filter out those tombstone records; is that expected
as well?  If I throw in a .filter operation before my foreach, its
Predicate is not invoked, and the foreach's ForeachAction is invoked with a
null value still.

Mathieu


On Wed, Jul 20, 2016 at 8:23 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Hi Mathieu,
>
> join semantics are tricky. We are still working on a better
> documentation for it...
>
> For the current state and your question:
>
> Each time a record is processed, it looks up the other KTable to see if
> there is a matching record. If non is found, the join result is empty
> and a tombstone record with <key:null> is sent downstream. This happens,
> to delete any (possible existing) previous join result for this key --
> keep in mind, that the result is a KTable containing the current state
> of the join.
>
> This happens both ways, thus, if your first records of each stream do
> not match on the key, both result in a <key:null> message to delete
> possible existing join-tuples in the result KTable.
>
> Does this make sense to you?
>
> -Matthias
>
> On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> > Hello Kafka users,
> >
> > I'm seeing some unexpected results when using Kafka Streams, and I was
> > hoping someone could explain them to me.  I have two streams, which I've
> > converted KStream->KTable, and then I am joining them together with a
> > "join" (not an outer join, not a full join).  With the resulting KTable
> > from the join, I am performing a foreach.
> >
> > When I startup my Streams application, my foreach receives two records
> with
> > valid keys but null values *before* my ValueJoiner ever gets executed.
> Why
> > would that be?
> >
> > Code excerpt; please excuse the use of Kotlin here:
> >
> > val builder = KStreamBuilder()
> >
> > val approvalStatus = builder.table(
> >         Serdes.String(),
> >         JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
> >         "TimesheetApprovalStatusChanged"
> > )
> >
> > val timesheetLastApprovalAction = builder.table(
> >         Serdes.String(),
> >         JsonSerde(Map::class.java),
> >         "TimesheetApprovalActionPerformed"
> > )
> >
> > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
> > approvalStatus, lastApprovalAction ->
> >     println("EXECUTING ValueJoiner")
> >     computeTimesheetStatus(approvalStatus.approvalStatus!!,
> lastApprovalAction
> > as Map<String, Any?>)
> > })
> >
> > timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
> >     println("EXECUTING ForeachAction: $timesheetKey, status:
> > $timesheetStatus")
> >     if (timesheetStatus == null) {
> >         println("SKIPPING NULL I DON'T UNDERSTAND")
> >     }
> > })
> >
> >
> > Resulting console output:
> >
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: null
> > SKIPPING NULL I DON'T UNDERSTAND
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: null
> > SKIPPING NULL I DON'T UNDERSTAND
> > EXECUTING ValueJoiner
> > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> > status: urn:replicon:timesheet-status:submitting
> >
> >
> > Any explanation on why the foreach would be executing for data that
> hasn't
> > been generated by my join?
> >
> > Thanks,
> >
> > Mathieu
> >
>
>

Re: Kafka Streams: KTable join + filter + foreach

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi Mathieu,

join semantics are tricky. We are still working on a better
documentation for it...

For the current state and your question:

Each time a record is processed, it looks up the other KTable to see if
there is a matching record. If non is found, the join result is empty
and a tombstone record with <key:null> is sent downstream. This happens,
to delete any (possible existing) previous join result for this key --
keep in mind, that the result is a KTable containing the current state
of the join.

This happens both ways, thus, if your first records of each stream do
not match on the key, both result in a <key:null> message to delete
possible existing join-tuples in the result KTable.

Does this make sense to you?

-Matthias

On 07/20/2016 04:09 PM, Mathieu Fenniak wrote:
> Hello Kafka users,
> 
> I'm seeing some unexpected results when using Kafka Streams, and I was
> hoping someone could explain them to me.  I have two streams, which I've
> converted KStream->KTable, and then I am joining them together with a
> "join" (not an outer join, not a full join).  With the resulting KTable
> from the join, I am performing a foreach.
> 
> When I startup my Streams application, my foreach receives two records with
> valid keys but null values *before* my ValueJoiner ever gets executed.  Why
> would that be?
> 
> Code excerpt; please excuse the use of Kotlin here:
> 
> val builder = KStreamBuilder()
> 
> val approvalStatus = builder.table(
>         Serdes.String(),
>         JsonSerde(TimesheetApprovalStatusChangedMessage::class.java),
>         "TimesheetApprovalStatusChanged"
> )
> 
> val timesheetLastApprovalAction = builder.table(
>         Serdes.String(),
>         JsonSerde(Map::class.java),
>         "TimesheetApprovalActionPerformed"
> )
> 
> val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, {
> approvalStatus, lastApprovalAction ->
>     println("EXECUTING ValueJoiner")
>     computeTimesheetStatus(approvalStatus.approvalStatus!!, lastApprovalAction
> as Map<String, Any?>)
> })
> 
> timesheetStatus.foreach({ timesheetKey, timesheetStatus ->
>     println("EXECUTING ForeachAction: $timesheetKey, status:
> $timesheetStatus")
>     if (timesheetStatus == null) {
>         println("SKIPPING NULL I DON'T UNDERSTAND")
>     }
> })
> 
> 
> Resulting console output:
> 
> EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> status: null
> SKIPPING NULL I DON'T UNDERSTAND
> EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> status: null
> SKIPPING NULL I DON'T UNDERSTAND
> EXECUTING ValueJoiner
> EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982,
> status: urn:replicon:timesheet-status:submitting
> 
> 
> Any explanation on why the foreach would be executing for data that hasn't
> been generated by my join?
> 
> Thanks,
> 
> Mathieu
>