You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Yair Halberstadt <ya...@doubleverify.com.INVALID> on 2020/01/20 11:27:29 UTC

Does Merging two kafka-streams preserve co-partitioning

Hi
I asked this question on stack-overflow and was wondering if anyone here
could answer it:
https://stackoverflow.com/questions/59820243/does-merging-two-kafka-streams-preserve-co-partitioning


I have 2 co-partitioned kafka topics. One contains automatically generated
data, and the other manual overrides.

I want to merge them and filter out any automatically generated data that
has already been manually overidden, and then forward everything to a
combined ChangeLog topic.

To do so I create a stream from each topic, and [merge the streams](
https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream-)
using the dsl API.

I then apply the following transform, which stores any manual data, and
deletes any automatic data which has already been manually overidden:
(Scala but should be pretty easy to understand if you know java)

```scala
class FilterManuallyClassifiedTransformer(manualOverridesStoreName : String)
  extends Transformer[Long, Data, KeyValue[Long, Data]] {

  // Array[Byte] used as dummy value since we don't use the value
  var store: KeyValueStore[Long, Array[Byte]] = _

  override def init(context: ProcessorContext): Unit = {
    store = context.getStateStore(manualOverridesStoreName
).asInstanceOf[KeyValueStore[Long, Array[Byte]]]
  }

  override def close(): Unit = {}

  override def transform(key: Long, value: Data): KeyValue[Long, Data] = {
    if (value.getIsManual) {
      store.put(key, Array.emptyByteArray)
      new KeyValue(key, value)
    }
    else if (store.get(key) == null) {
      new KeyValue(key, value)
    }
    else {
      null
    }
  }
}
```

If I understand correctly, there is no guarantee this will work unless
manual data and automatic data with the same key are in the same partition.
Otherwise the manual override might be stored in a different state store to
the one that the automatic data checks.

And even if they are stored in the same StateStore there might be race
conditions, where an automatic data checks the state store, then the manual
override is added to the state store, then the manual override is written
to the output topic, then the automatic data is written to the output
topic, leading to the automatic data overwriting the manual override.

Is that correct?

And if so will `merge` preserve the co-partitioning guarantee I need?

Thanks for your help

Re: Does Merging two kafka-streams preserve co-partitioning

Posted by Yair Halberstadt <ya...@doubleverify.com.INVALID>.
Thanks John!

I don't think transformValues will work here because I need to remove
records which already have manual data?

Either way it doesn't matter too much as I just write them straight to
kafka.

Thanks for your help!

On Mon, Jan 20, 2020 at 4:48 PM John Roesler <vv...@apache.org> wrote:

> Hi Yair,
>
> You should be fine!
>
> Merging does preserve copartitioning.
>
> Also processing on that partition is single-threaded, so you don’t have to
> worry about races on the same key in your transformer.
>
> Actually, you might want to use transformValues to inform Streams that you
> haven’t changed the key. Otherwise, it would need to repartition the result
> before you could do further stateful processing.
>
> I hope this helps!
>
> Thanks,
> John
>
> On Mon, Jan 20, 2020, at 05:27, Yair Halberstadt wrote:
> > Hi
> > I asked this question on stack-overflow and was wondering if anyone here
> > could answer it:
> >
> https://stackoverflow.com/questions/59820243/does-merging-two-kafka-streams-preserve-co-partitioning
> >
> >
> > I have 2 co-partitioned kafka topics. One contains automatically
> generated
> > data, and the other manual overrides.
> >
> > I want to merge them and filter out any automatically generated data that
> > has already been manually overidden, and then forward everything to a
> > combined ChangeLog topic.
> >
> > To do so I create a stream from each topic, and [merge the streams](
> >
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream-
> )
> > using the dsl API.
> >
> > I then apply the following transform, which stores any manual data, and
> > deletes any automatic data which has already been manually overidden:
> > (Scala but should be pretty easy to understand if you know java)
> >
> > ```scala
> > class FilterManuallyClassifiedTransformer(manualOverridesStoreName :
> String)
> >   extends Transformer[Long, Data, KeyValue[Long, Data]] {
> >
> >   // Array[Byte] used as dummy value since we don't use the value
> >   var store: KeyValueStore[Long, Array[Byte]] = _
> >
> >   override def init(context: ProcessorContext): Unit = {
> >     store = context.getStateStore(manualOverridesStoreName
> > ).asInstanceOf[KeyValueStore[Long, Array[Byte]]]
> >   }
> >
> >   override def close(): Unit = {}
> >
> >   override def transform(key: Long, value: Data): KeyValue[Long, Data] =
> {
> >     if (value.getIsManual) {
> >       store.put(key, Array.emptyByteArray)
> >       new KeyValue(key, value)
> >     }
> >     else if (store.get(key) == null) {
> >       new KeyValue(key, value)
> >     }
> >     else {
> >       null
> >     }
> >   }
> > }
> > ```
> >
> > If I understand correctly, there is no guarantee this will work unless
> > manual data and automatic data with the same key are in the same
> partition.
> > Otherwise the manual override might be stored in a different state store
> to
> > the one that the automatic data checks.
> >
> > And even if they are stored in the same StateStore there might be race
> > conditions, where an automatic data checks the state store, then the
> manual
> > override is added to the state store, then the manual override is written
> > to the output topic, then the automatic data is written to the output
> > topic, leading to the automatic data overwriting the manual override.
> >
> > Is that correct?
> >
> > And if so will `merge` preserve the co-partitioning guarantee I need?
> >
> > Thanks for your help
> >
>

Re: Does Merging two kafka-streams preserve co-partitioning

Posted by John Roesler <vv...@apache.org>.
Hi Yair,

You should be fine! 

Merging does preserve copartitioning.

Also processing on that partition is single-threaded, so you don’t have to worry about races on the same key in your transformer.

Actually, you might want to use transformValues to inform Streams that you haven’t changed the key. Otherwise, it would need to repartition the result before you could do further stateful processing. 

I hope this helps!

Thanks,
John

On Mon, Jan 20, 2020, at 05:27, Yair Halberstadt wrote:
> Hi
> I asked this question on stack-overflow and was wondering if anyone here
> could answer it:
> https://stackoverflow.com/questions/59820243/does-merging-two-kafka-streams-preserve-co-partitioning
> 
> 
> I have 2 co-partitioned kafka topics. One contains automatically generated
> data, and the other manual overrides.
> 
> I want to merge them and filter out any automatically generated data that
> has already been manually overidden, and then forward everything to a
> combined ChangeLog topic.
> 
> To do so I create a stream from each topic, and [merge the streams](
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream-)
> using the dsl API.
> 
> I then apply the following transform, which stores any manual data, and
> deletes any automatic data which has already been manually overidden:
> (Scala but should be pretty easy to understand if you know java)
> 
> ```scala
> class FilterManuallyClassifiedTransformer(manualOverridesStoreName : String)
>   extends Transformer[Long, Data, KeyValue[Long, Data]] {
> 
>   // Array[Byte] used as dummy value since we don't use the value
>   var store: KeyValueStore[Long, Array[Byte]] = _
> 
>   override def init(context: ProcessorContext): Unit = {
>     store = context.getStateStore(manualOverridesStoreName
> ).asInstanceOf[KeyValueStore[Long, Array[Byte]]]
>   }
> 
>   override def close(): Unit = {}
> 
>   override def transform(key: Long, value: Data): KeyValue[Long, Data] = {
>     if (value.getIsManual) {
>       store.put(key, Array.emptyByteArray)
>       new KeyValue(key, value)
>     }
>     else if (store.get(key) == null) {
>       new KeyValue(key, value)
>     }
>     else {
>       null
>     }
>   }
> }
> ```
> 
> If I understand correctly, there is no guarantee this will work unless
> manual data and automatic data with the same key are in the same partition.
> Otherwise the manual override might be stored in a different state store to
> the one that the automatic data checks.
> 
> And even if they are stored in the same StateStore there might be race
> conditions, where an automatic data checks the state store, then the manual
> override is added to the state store, then the manual override is written
> to the output topic, then the automatic data is written to the output
> topic, leading to the automatic data overwriting the manual override.
> 
> Is that correct?
> 
> And if so will `merge` preserve the co-partitioning guarantee I need?
> 
> Thanks for your help
>