You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nan Xu <na...@gmail.com> on 2019/01/28 22:37:18 UTC

kafka stream depends on it's own derived table

hi,
I was writing a simple stream app, all it does is producer send a sequence
of path and value, for example
path /0 ,  value 1
path /0/1,  value 2
path /0/1/2, value 3
and kafka stream take those input and produce a ktable store.

There is a rule. if parent path is not exist, then child can not insert.
so if /0/1 is not there,  insert /0/1/2 should be filter out.

I use the following program to process it.  path send as /0, /0/1, /0/1/2,
....., /0/1/../9.

Because the filter is depends on the ktable store, which was build after
the filter stream.  When filter check for a path if its parent exist, it
could be the parent path already pass the filter, but not at the store
yet,  but from filter, it think the parent is not exist. this is more of a
problem of asyn processing. because the parent is not fully done( to the
store), and next element start processing (filter)

Another problem is because parent key and child key are different, so the
path arrival seq could be different as producer send sequence, which also
cause the child get filter out.  producer send as /0, /0/1, /0/1/2.. but
broker get it as /0, /0/1/2, /0/1,.....then all the following path will be
filter out, because /0/1/2 don't get a chance to get created.

any thoughts to solve this?

Thanks,
Nan


val streamProperties = new Properties()
  streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
"my-first-streams-application1")
  streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
  streamProperties.put(StreamsConfig.CLIENT_ID_CONFIG,
"important-test-client")
  streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
classOf[StringSerde].getName)
  streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
classOf[IntegerSerde].getName)

  val streamBuilder = new StreamsBuilder()
  val topic = "input"

  val inputStream = streamBuilder.stream[String, Integer](topic)

  val materialized = Materialized.as[String, Integer, KeyValueStore[Bytes,
Array[Byte]]](topic + "_store")
    .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())

  val reducer = new Reducer[Integer](){
    override def apply(value1: Integer, value2: Integer): Integer = {
      value2
    }
  }

  //value is not important, only care key.
  val ktable = inputStream.filter(filter).groupByKey().reduce(reducer,
materialized)

  // make sure parent exist.
  def filter(key: String, value: Integer): Boolean = {
    println("===current store===, checking key " + key + " value: " + value
)
    store.all().forEachRemaining(x => println(x.key))
    val parent = key.trim().substring(0, key.lastIndexOf("/"))
    if(parent == "") {
      true
    } else {
      if (store.get(parent) == null) {
        println("not found parent" + parent)
        false
      } else {
        true
      }
    }
  }

  val topology = streamBuilder.build()
  val streams = new KafkaStreams(topology, streamProperties)
  streams.start()

  Thread.sleep(6000)
  val storeName = ktable.queryableStoreName()
  val store = streams.store(storeName,
QueryableStoreTypes.keyValueStore[String, Integer])


val senderProperties = new Properties
  senderProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
  senderProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer")
  senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
  senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[IntegerSerializer].getName)
  val producer = new KafkaProducer[String, Integer](senderProperties)


  for(j <- 1 until 10) {
    val path = for(i <- 0 until j) yield {
      "/" + i
    }
    producer.send(new ProducerRecord(topic, path.mkString(""), j))
  }

  Thread.sleep(3000)
  println("====show final store state====")
  store.all().forEachRemaining(x => println(x.key))

Thread.sleep(10000000)


output:
===current store===, checking key /0 value: 1
===current store===, checking key /0/1 value: 2
/0
===current store===, checking key /0/1/2/3 value: 4
/0/1
/0
not found parent/0/1/2
===current store===, checking key /0/1/2 value: 3
/0/1
/0
===current store===, checking key /0/1/2/3/4/5 value: 6
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4
===current store===, checking key /0/1/2/3/4 value: 5
/0/1
/0
/0/1/2
not found parent/0/1/2/3
===current store===, checking key /0/1/2/3/4/5/6 value: 7
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5
===current store===, checking key /0/1/2/3/4/5/6/7/8 value: 9
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5/6/7
===current store===, checking key /0/1/2/3/4/5/6/7 value: 8
/0/1
/0
/0/1/2
not found parent/0/1/2/3/4/5/6
====show final store state====
/0/1
/0
/0/1/2

Re: kafka stream depends on it's own derived table

Posted by "Matthias J. Sax" <ma...@confluent.io>.
It might be easier to use a `Transformer` with a state store. Each time
you receive an input record, you first check if the parent entry is in
the store. If yes, add the new record, otherwise not.

-Matthias

On 1/28/19 2:37 PM, Nan Xu wrote:
> hi,
> I was writing a simple stream app, all it does is producer send a sequence
> of path and value, for example
> path /0 ,  value 1
> path /0/1,  value 2
> path /0/1/2, value 3
> and kafka stream take those input and produce a ktable store.
> 
> There is a rule. if parent path is not exist, then child can not insert.
> so if /0/1 is not there,  insert /0/1/2 should be filter out.
> 
> I use the following program to process it.  path send as /0, /0/1, /0/1/2,
> ....., /0/1/../9.
> 
> Because the filter is depends on the ktable store, which was build after
> the filter stream.  When filter check for a path if its parent exist, it
> could be the parent path already pass the filter, but not at the store
> yet,  but from filter, it think the parent is not exist. this is more of a
> problem of asyn processing. because the parent is not fully done( to the
> store), and next element start processing (filter)
> 
> Another problem is because parent key and child key are different, so the
> path arrival seq could be different as producer send sequence, which also
> cause the child get filter out.  producer send as /0, /0/1, /0/1/2.. but
> broker get it as /0, /0/1/2, /0/1,.....then all the following path will be
> filter out, because /0/1/2 don't get a chance to get created.
> 
> any thoughts to solve this?
> 
> Thanks,
> Nan
> 
> 
> val streamProperties = new Properties()
>   streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "my-first-streams-application1")
>   streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092")
>   streamProperties.put(StreamsConfig.CLIENT_ID_CONFIG,
> "important-test-client")
>   streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> classOf[StringSerde].getName)
>   streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> classOf[IntegerSerde].getName)
> 
>   val streamBuilder = new StreamsBuilder()
>   val topic = "input"
> 
>   val inputStream = streamBuilder.stream[String, Integer](topic)
> 
>   val materialized = Materialized.as[String, Integer, KeyValueStore[Bytes,
> Array[Byte]]](topic + "_store")
>     .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())
> 
>   val reducer = new Reducer[Integer](){
>     override def apply(value1: Integer, value2: Integer): Integer = {
>       value2
>     }
>   }
> 
>   //value is not important, only care key.
>   val ktable = inputStream.filter(filter).groupByKey().reduce(reducer,
> materialized)
> 
>   // make sure parent exist.
>   def filter(key: String, value: Integer): Boolean = {
>     println("===current store===, checking key " + key + " value: " + value
> )
>     store.all().forEachRemaining(x => println(x.key))
>     val parent = key.trim().substring(0, key.lastIndexOf("/"))
>     if(parent == "") {
>       true
>     } else {
>       if (store.get(parent) == null) {
>         println("not found parent" + parent)
>         false
>       } else {
>         true
>       }
>     }
>   }
> 
>   val topology = streamBuilder.build()
>   val streams = new KafkaStreams(topology, streamProperties)
>   streams.start()
> 
>   Thread.sleep(6000)
>   val storeName = ktable.queryableStoreName()
>   val store = streams.store(storeName,
> QueryableStoreTypes.keyValueStore[String, Integer])
> 
> 
> val senderProperties = new Properties
>   senderProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092")
>   senderProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer")
>   senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> classOf[StringSerializer].getName)
>   senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> classOf[IntegerSerializer].getName)
>   val producer = new KafkaProducer[String, Integer](senderProperties)
> 
> 
>   for(j <- 1 until 10) {
>     val path = for(i <- 0 until j) yield {
>       "/" + i
>     }
>     producer.send(new ProducerRecord(topic, path.mkString(""), j))
>   }
> 
>   Thread.sleep(3000)
>   println("====show final store state====")
>   store.all().forEachRemaining(x => println(x.key))
> 
> Thread.sleep(10000000)
> 
> 
> output:
> ===current store===, checking key /0 value: 1
> ===current store===, checking key /0/1 value: 2
> /0
> ===current store===, checking key /0/1/2/3 value: 4
> /0/1
> /0
> not found parent/0/1/2
> ===current store===, checking key /0/1/2 value: 3
> /0/1
> /0
> ===current store===, checking key /0/1/2/3/4/5 value: 6
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3/4
> ===current store===, checking key /0/1/2/3/4 value: 5
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3
> ===current store===, checking key /0/1/2/3/4/5/6 value: 7
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3/4/5
> ===current store===, checking key /0/1/2/3/4/5/6/7/8 value: 9
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3/4/5/6/7
> ===current store===, checking key /0/1/2/3/4/5/6/7 value: 8
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3/4/5/6
> ====show final store state====
> /0/1
> /0
> /0/1/2
>