You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Eugen Feller (JIRA)" <ji...@apache.org> on 2018/11/19 18:40:01 UTC

[jira] [Comment Edited] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

    [ https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690159#comment-16690159 ] 

Eugen Feller edited comment on KAFKA-7634 at 11/19/18 6:39 PM:
---------------------------------------------------------------

Sure, let me try what I can do. Code looks like this:

 
{code:java}
val table =
  bldr
    .table[Key, Value1](
      keySerde,
      valueSerde1,
      topicA,
      stateStoreName
    )

val stream1 =
  bldr
    .stream[Key, Value2](
      keySerde,
      valueSerde2,
      topicB
    )
    .filterNot((k: Key, s: Value2) => s == null)

val enrichedStream = stream1
  .leftJoin[Value1, Value3](
    table,
    joiner,
    keySerde,
    valueSerde2
  )

val explodedStream =
  bldr
    .stream[Key, Value4](
      keySerde,
      valueSerde4,
      topicC
    )
    .flatMapValues[Value3]()

val mergedStream = bldr.merge[Key, Value3](enrichedStream, explodedStream)
mergedStream.transform[Key, Value3](transformer).to(keySerde, valueSerde3, outputTopic){code}
 


was (Author: efeller):
Sure, let me try what I can do. Code looks like this:

 
{code:java}
val table =
  bldr
    .table[Key, Value1](
      keySerde,
      valueSerde1,
      topicA,
      stateStoreName
    )

val stream1 =
  bldr
    .stream[Key, Value2](
      keySerde,
      valueSerde2,
      topicB
    )
    .filterNot((k: Key, s: Value2) => s == null)

val enrichedStream = stream1
  .leftJoin[Value1, Value3](
    table,
    joiner,
    keySerde,
    valueSerde2
  )

val explodedStream =
  bldr
    .stream[Mac, Value4](
      keySerde,
      valueSerde4,
      topicC
    )
    .flatMapValues[Value3]()

val mergedStream = bldr.merge[Key, Value3](enrichedStream, explodedStream)
mergedStream.transform[Key, Value3](transformer).to(keySerde, valueSerde3, outputTopic){code}
 

> Punctuate not being called with merge() and/or outerJoin()
> ----------------------------------------------------------
>
>                 Key: KAFKA-7634
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7634
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.3
>            Reporter: Eugen Feller
>            Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). Specifically, I am having a topology where I am doing flatMapValues() -> merge() and/or outerJoin -> transform(). If I dont call merge() and/or outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)