You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Andrey L. Neporada" <an...@yandex-team.ru> on 2016/12/26 12:52:26 UTC

custom offsets in ProduceRequest

Hi all!

Suppose you have two Kafka clusters and want to replicate topics from primary cluster to secondary one.
It would be very convenient for readers if the message offsets for replicated topics would be the same as for primary topics.

As far as I know, currently there is no way to achieve this.
I wonder is it possible/reasonable to add message offset to ProduceRequest?


—
Andrey Neporada




Re: custom offsets in ProduceRequest

Posted by "Andrey L. Neporada" <an...@yandex-team.ru>.
Hi!

> On 30 Dec 2016, at 09:52, radai <ra...@gmail.com> wrote:
> 
> or even better - if topic creation is done dynamically by the replicator,
> setting the initial offsets for partitions could be made part of topic
> creation ? even less API changes this way
> 
> On Thu, Dec 29, 2016 at 10:49 PM, radai <ra...@gmail.com> wrote:
> 
>> ah, I didnt realize we are limiting the discussion to master --> slave.
>> 
>> but - if we're talking about master-slave replication, and under the
>> conditions i outlined earlier (src and dest match in #partitions, no
>> foreign writes to dest) it "just works", seems to me the only thing youre
>> really missing is not an explicit desired offset param on each and every
>> request, but just the ability to "reset" the starting offset on the dest
>> cluster at topic creation.
>> 

Unfortunately, resetting starting offset at topic creation is not enough.
Currently we do support log compaction - in other words, message offsets can be non-contigous inside partition.

Adding new request ResetNextOffset(partition, offset) will do the job, however I think it would be easier to extend ProduceRequest than to introduce ResetNextOffset.
Let me try to explain it in more detail.

Current ProduceRequest works as follows:

1) get messageSet from client
2) locate the offset of the last message in log (say N)
3) validate messageSet and assign the offsets to messageSet starting from N+1
4) save messageSet to log

Proposed ProduceRequest should work this way:

1) get messageSet along with messageSetStartOffset (say S) from client
2) locate the offset of the last message in log (say N)
3) if S < N + 1, report error to client
4) otherwise validate messageSet and assign the offsets to messageSet starting from S
5) save messageSet to log


The other alternative is to add ResetNextOffset(partition, nextOffset) API.
However, since ResetNextOffset changes partition state, it should be somehow persisted on disk and replicated from leader to followers.
This will require to introduce changes to on-disk partition format - for example we could probably add special “marker” messages with the meaning “next message should get offset S”, etc.

So I believe that extending ProduceRequest to accept messageSetStart offset is easier than to introduce ResetNextOffset.


>> let me try and run through a more detailed scenario:
>> 
>> 1. suppose i set up the original cluster (src). no remote cluster yet.
>> lets say over some period of time i produce 1 million msgs to topic X on
>> this src cluster.
>> 2. company grows, 2nd site is opened, dest cluster is created, topic X is
>> created on (brand new) dest cluster.
>> 3. offsets are manually set on every partition of X on the dest cluster to
>> match either the oldest retained or current offset of the matching
>> partition of X in src. in pseudo code:
>> 
>>     for (partI in numPartitions) {
>>        partIOffset
>>        if (replicateAllRetainedHistory) {
>>           partIOffset = src.getOldestRetained(partI)
>>        } else {
>>           partIOffset = src.getCurrent(partI) //will not copy over history
>>        }
>>        dest.resetStartingOffset(partI, partIOffset)   <---- new mgmt API
>>     }
>> 
>> 4. now you are free to start replicating. under master --> slave
>> assumptions offsets will match from this point forward
>> 
>> seems to me something like this could be made part of the replicator
>> component (mirror maker, or whatever else you want to use) - if topic X
>> does not exist in destination, create it, reset initial offsets to match
>> source, start replication
>> 
>> On Thu, Dec 29, 2016 at 12:41 PM, Andrey L. Neporada <
>> aneporada@yandex-team.ru> wrote:
>> 
>>> 
>>>> On 29 Dec 2016, at 20:43, radai <ra...@gmail.com> wrote:
>>>> 
>>>> so, if i follow your suggested logic correctly, there would be some
>>> sort of
>>>> :
>>>> 
>>>> produce(partition, msg, requestedOffset)
>>>> 
>>> 
>>>> which would fail if requestedOffset is already taken (by another
>>> previous
>>>> such explicit call or by another regular call that just happened to get
>>>> assigned that offset by the partition leader on the target cluster).
>>>> 
>>> 
>>> Yes. More formally, my proposal is to extend ProduceRequest by adding
>>> MessageSetStartOffset:
>>> 
>>> ProduceRequest => RequiredAcks Timeout [TopicName [Partition
>>> MessageSetStartOffset MessageSetSize MessageSet]]
>>>  RequiredAcks => int16
>>>  Timeout => int32
>>>  Partition => int32
>>>  MessageSetSize => int32
>>>  MessageSetStartOffset => int64
>>> 
>>> If MessageSetStartOffset is -1, ProduceRequest should work exactly as
>>> before - i.e. assign next available offset to given MessageSet.
>>> 
>>> 
>>>> how would you meaningfully handle this failure?
>>>> 
>>>> suppose this happens to some cross-cluster replicator (like mirror
>>> maker).
>>>> there is no use in retrying. the options would be:
>>>> 
>>>> 1. get the next available offset - which would violate what youre
>>> trying to
>>>> achieve
>>>> 2. skip msgs - so replication is incomplete, any offset "already taken"
>>> on
>>>> the destination is not replicated from source
>>>> 3. stop replication for this partition completely - because starting
>>> from
>>>> now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the
>>>> entire partition.
>>>> 
>>>> none of these options look good to me.
>>>> 
>>>> 
>>> 
>>> Since we are discussing master-slave replication, the only client writing
>>> to slave cluster is the replicator itself.
>>> In this case ProduceRequest failure is some kind of replication logic
>>> error - for example when two replication instances are somehow launched for
>>> single partition.
>>> The best option here is just to stop replication process.
>>> 
>>> So the answer to your question is (3), but this scenario should never
>>> happen.
>>> 
>>> 
>>>> 
>>>> On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada <
>>>> aneporada@yandex-team.ru> wrote:
>>>> 
>>>>> Hi!
>>>>> 
>>>>>> On 27 Dec 2016, at 19:35, radai <ra...@gmail.com> wrote:
>>>>>> 
>>>>>> IIUC if you replicate from a single source cluster to a single target
>>>>>> cluster, the topic has the same number of partitions on both, and no
>>> one
>>>>>> writes directly to the target cluster (so master --> slave) the
>>> offsets
>>>>>> would be preserved.
>>>>>> 
>>>>> 
>>>>> Yes, exactly. When you
>>>>> 1) create topic with the same number of partitions on both master and
>>>>> slave clusters
>>>>> 2) write only to master
>>>>> 3) replicate partition to partition from master to slave
>>>>> - in this case the offsets will be preserved.
>>>>> 
>>>>> However, you usually already have cluster that works and want to
>>> replicate
>>>>> some topics to another one.
>>>>> IMHO, in this scenario there should be a way to make message offsets
>>> equal
>>>>> on both clusters.
>>>>> 
>>>>>> but in the general case - how would you handle the case where multiple
>>>>>> producers "claim" the same offset ?
>>>>> 
>>>>> The same way as Kafka handles concurrent produce requests for the same
>>>>> partition - produce requests for partition are serialized.
>>>>> If the next produce request “overlaps” with previous one, it fails.
>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada <
>>>>>> aneporada@yandex-team.ru> wrote:
>>>>>> 
>>>>>>> Hi all!
>>>>>>> 
>>>>>>> Suppose you have two Kafka clusters and want to replicate topics from
>>>>>>> primary cluster to secondary one.
>>>>>>> It would be very convenient for readers if the message offsets for
>>>>>>> replicated topics would be the same as for primary topics.
>>>>>>> 
>>>>>>> As far as I know, currently there is no way to achieve this.
>>>>>>> I wonder is it possible/reasonable to add message offset to
>>>>> ProduceRequest?
>>>>>>> 
>>>>>>> 
>>>>>>> —
>>>>>>> Andrey Neporada
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>> 


Re: custom offsets in ProduceRequest

Posted by radai <ra...@gmail.com>.
or even better - if topic creation is done dynamically by the replicator,
setting the initial offsets for partitions could be made part of topic
creation ? even less API changes this way

On Thu, Dec 29, 2016 at 10:49 PM, radai <ra...@gmail.com> wrote:

> ah, I didnt realize we are limiting the discussion to master --> slave.
>
> but - if we're talking about master-slave replication, and under the
> conditions i outlined earlier (src and dest match in #partitions, no
> foreign writes to dest) it "just works", seems to me the only thing youre
> really missing is not an explicit desired offset param on each and every
> request, but just the ability to "reset" the starting offset on the dest
> cluster at topic creation.
>
> let me try and run through a more detailed scenario:
>
> 1. suppose i set up the original cluster (src). no remote cluster yet.
> lets say over some period of time i produce 1 million msgs to topic X on
> this src cluster.
> 2. company grows, 2nd site is opened, dest cluster is created, topic X is
> created on (brand new) dest cluster.
> 3. offsets are manually set on every partition of X on the dest cluster to
> match either the oldest retained or current offset of the matching
> partition of X in src. in pseudo code:
>
>      for (partI in numPartitions) {
>         partIOffset
>         if (replicateAllRetainedHistory) {
>            partIOffset = src.getOldestRetained(partI)
>         } else {
>            partIOffset = src.getCurrent(partI) //will not copy over history
>         }
>         dest.resetStartingOffset(partI, partIOffset)   <---- new mgmt API
>      }
>
> 4. now you are free to start replicating. under master --> slave
> assumptions offsets will match from this point forward
>
> seems to me something like this could be made part of the replicator
> component (mirror maker, or whatever else you want to use) - if topic X
> does not exist in destination, create it, reset initial offsets to match
> source, start replication
>
> On Thu, Dec 29, 2016 at 12:41 PM, Andrey L. Neporada <
> aneporada@yandex-team.ru> wrote:
>
>>
>> > On 29 Dec 2016, at 20:43, radai <ra...@gmail.com> wrote:
>> >
>> > so, if i follow your suggested logic correctly, there would be some
>> sort of
>> > :
>> >
>> > produce(partition, msg, requestedOffset)
>> >
>>
>> > which would fail if requestedOffset is already taken (by another
>> previous
>> > such explicit call or by another regular call that just happened to get
>> > assigned that offset by the partition leader on the target cluster).
>> >
>>
>> Yes. More formally, my proposal is to extend ProduceRequest by adding
>> MessageSetStartOffset:
>>
>> ProduceRequest => RequiredAcks Timeout [TopicName [Partition
>> MessageSetStartOffset MessageSetSize MessageSet]]
>>   RequiredAcks => int16
>>   Timeout => int32
>>   Partition => int32
>>   MessageSetSize => int32
>>   MessageSetStartOffset => int64
>>
>> If MessageSetStartOffset is -1, ProduceRequest should work exactly as
>> before - i.e. assign next available offset to given MessageSet.
>>
>>
>> > how would you meaningfully handle this failure?
>> >
>> > suppose this happens to some cross-cluster replicator (like mirror
>> maker).
>> > there is no use in retrying. the options would be:
>> >
>> > 1. get the next available offset - which would violate what youre
>> trying to
>> > achieve
>> > 2. skip msgs - so replication is incomplete, any offset "already taken"
>> on
>> > the destination is not replicated from source
>> > 3. stop replication for this partition completely - because starting
>> from
>> > now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the
>> > entire partition.
>> >
>> > none of these options look good to me.
>> >
>> >
>>
>> Since we are discussing master-slave replication, the only client writing
>> to slave cluster is the replicator itself.
>> In this case ProduceRequest failure is some kind of replication logic
>> error - for example when two replication instances are somehow launched for
>> single partition.
>> The best option here is just to stop replication process.
>>
>> So the answer to your question is (3), but this scenario should never
>> happen.
>>
>>
>> >
>> > On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada <
>> > aneporada@yandex-team.ru> wrote:
>> >
>> >> Hi!
>> >>
>> >>> On 27 Dec 2016, at 19:35, radai <ra...@gmail.com> wrote:
>> >>>
>> >>> IIUC if you replicate from a single source cluster to a single target
>> >>> cluster, the topic has the same number of partitions on both, and no
>> one
>> >>> writes directly to the target cluster (so master --> slave) the
>> offsets
>> >>> would be preserved.
>> >>>
>> >>
>> >> Yes, exactly. When you
>> >> 1) create topic with the same number of partitions on both master and
>> >> slave clusters
>> >> 2) write only to master
>> >> 3) replicate partition to partition from master to slave
>> >> - in this case the offsets will be preserved.
>> >>
>> >> However, you usually already have cluster that works and want to
>> replicate
>> >> some topics to another one.
>> >> IMHO, in this scenario there should be a way to make message offsets
>> equal
>> >> on both clusters.
>> >>
>> >>> but in the general case - how would you handle the case where multiple
>> >>> producers "claim" the same offset ?
>> >>
>> >> The same way as Kafka handles concurrent produce requests for the same
>> >> partition - produce requests for partition are serialized.
>> >> If the next produce request “overlaps” with previous one, it fails.
>> >>
>> >>>
>> >>>
>> >>> On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada <
>> >>> aneporada@yandex-team.ru> wrote:
>> >>>
>> >>>> Hi all!
>> >>>>
>> >>>> Suppose you have two Kafka clusters and want to replicate topics from
>> >>>> primary cluster to secondary one.
>> >>>> It would be very convenient for readers if the message offsets for
>> >>>> replicated topics would be the same as for primary topics.
>> >>>>
>> >>>> As far as I know, currently there is no way to achieve this.
>> >>>> I wonder is it possible/reasonable to add message offset to
>> >> ProduceRequest?
>> >>>>
>> >>>>
>> >>>> —
>> >>>> Andrey Neporada
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>
>> >>
>>
>>
>

Re: custom offsets in ProduceRequest

Posted by radai <ra...@gmail.com>.
ah, I didnt realize we are limiting the discussion to master --> slave.

but - if we're talking about master-slave replication, and under the
conditions i outlined earlier (src and dest match in #partitions, no
foreign writes to dest) it "just works", seems to me the only thing youre
really missing is not an explicit desired offset param on each and every
request, but just the ability to "reset" the starting offset on the dest
cluster at topic creation.

let me try and run through a more detailed scenario:

1. suppose i set up the original cluster (src). no remote cluster yet. lets
say over some period of time i produce 1 million msgs to topic X on this
src cluster.
2. company grows, 2nd site is opened, dest cluster is created, topic X is
created on (brand new) dest cluster.
3. offsets are manually set on every partition of X on the dest cluster to
match either the oldest retained or current offset of the matching
partition of X in src. in pseudo code:

     for (partI in numPartitions) {
        partIOffset
        if (replicateAllRetainedHistory) {
           partIOffset = src.getOldestRetained(partI)
        } else {
           partIOffset = src.getCurrent(partI) //will not copy over history
        }
        dest.resetStartingOffset(partI, partIOffset)   <---- new mgmt API
     }

4. now you are free to start replicating. under master --> slave
assumptions offsets will match from this point forward

seems to me something like this could be made part of the replicator
component (mirror maker, or whatever else you want to use) - if topic X
does not exist in destination, create it, reset initial offsets to match
source, start replication

On Thu, Dec 29, 2016 at 12:41 PM, Andrey L. Neporada <
aneporada@yandex-team.ru> wrote:

>
> > On 29 Dec 2016, at 20:43, radai <ra...@gmail.com> wrote:
> >
> > so, if i follow your suggested logic correctly, there would be some sort
> of
> > :
> >
> > produce(partition, msg, requestedOffset)
> >
>
> > which would fail if requestedOffset is already taken (by another previous
> > such explicit call or by another regular call that just happened to get
> > assigned that offset by the partition leader on the target cluster).
> >
>
> Yes. More formally, my proposal is to extend ProduceRequest by adding
> MessageSetStartOffset:
>
> ProduceRequest => RequiredAcks Timeout [TopicName [Partition
> MessageSetStartOffset MessageSetSize MessageSet]]
>   RequiredAcks => int16
>   Timeout => int32
>   Partition => int32
>   MessageSetSize => int32
>   MessageSetStartOffset => int64
>
> If MessageSetStartOffset is -1, ProduceRequest should work exactly as
> before - i.e. assign next available offset to given MessageSet.
>
>
> > how would you meaningfully handle this failure?
> >
> > suppose this happens to some cross-cluster replicator (like mirror
> maker).
> > there is no use in retrying. the options would be:
> >
> > 1. get the next available offset - which would violate what youre trying
> to
> > achieve
> > 2. skip msgs - so replication is incomplete, any offset "already taken"
> on
> > the destination is not replicated from source
> > 3. stop replication for this partition completely - because starting from
> > now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the
> > entire partition.
> >
> > none of these options look good to me.
> >
> >
>
> Since we are discussing master-slave replication, the only client writing
> to slave cluster is the replicator itself.
> In this case ProduceRequest failure is some kind of replication logic
> error - for example when two replication instances are somehow launched for
> single partition.
> The best option here is just to stop replication process.
>
> So the answer to your question is (3), but this scenario should never
> happen.
>
>
> >
> > On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada <
> > aneporada@yandex-team.ru> wrote:
> >
> >> Hi!
> >>
> >>> On 27 Dec 2016, at 19:35, radai <ra...@gmail.com> wrote:
> >>>
> >>> IIUC if you replicate from a single source cluster to a single target
> >>> cluster, the topic has the same number of partitions on both, and no
> one
> >>> writes directly to the target cluster (so master --> slave) the offsets
> >>> would be preserved.
> >>>
> >>
> >> Yes, exactly. When you
> >> 1) create topic with the same number of partitions on both master and
> >> slave clusters
> >> 2) write only to master
> >> 3) replicate partition to partition from master to slave
> >> - in this case the offsets will be preserved.
> >>
> >> However, you usually already have cluster that works and want to
> replicate
> >> some topics to another one.
> >> IMHO, in this scenario there should be a way to make message offsets
> equal
> >> on both clusters.
> >>
> >>> but in the general case - how would you handle the case where multiple
> >>> producers "claim" the same offset ?
> >>
> >> The same way as Kafka handles concurrent produce requests for the same
> >> partition - produce requests for partition are serialized.
> >> If the next produce request “overlaps” with previous one, it fails.
> >>
> >>>
> >>>
> >>> On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada <
> >>> aneporada@yandex-team.ru> wrote:
> >>>
> >>>> Hi all!
> >>>>
> >>>> Suppose you have two Kafka clusters and want to replicate topics from
> >>>> primary cluster to secondary one.
> >>>> It would be very convenient for readers if the message offsets for
> >>>> replicated topics would be the same as for primary topics.
> >>>>
> >>>> As far as I know, currently there is no way to achieve this.
> >>>> I wonder is it possible/reasonable to add message offset to
> >> ProduceRequest?
> >>>>
> >>>>
> >>>> —
> >>>> Andrey Neporada
> >>>>
> >>>>
> >>>>
> >>>>
> >>
> >>
>
>

Re: custom offsets in ProduceRequest

Posted by "Andrey L. Neporada" <an...@yandex-team.ru>.
> On 29 Dec 2016, at 20:43, radai <ra...@gmail.com> wrote:
> 
> so, if i follow your suggested logic correctly, there would be some sort of
> :
> 
> produce(partition, msg, requestedOffset)
> 

> which would fail if requestedOffset is already taken (by another previous
> such explicit call or by another regular call that just happened to get
> assigned that offset by the partition leader on the target cluster).
> 

Yes. More formally, my proposal is to extend ProduceRequest by adding MessageSetStartOffset:

ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetStartOffset MessageSetSize MessageSet]]
  RequiredAcks => int16
  Timeout => int32
  Partition => int32
  MessageSetSize => int32
  MessageSetStartOffset => int64

If MessageSetStartOffset is -1, ProduceRequest should work exactly as before - i.e. assign next available offset to given MessageSet.


> how would you meaningfully handle this failure?
> 
> suppose this happens to some cross-cluster replicator (like mirror maker).
> there is no use in retrying. the options would be:
> 
> 1. get the next available offset - which would violate what youre trying to
> achieve
> 2. skip msgs - so replication is incomplete, any offset "already taken" on
> the destination is not replicated from source
> 3. stop replication for this partition completely - because starting from
> now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the
> entire partition.
> 
> none of these options look good to me.
> 
> 

Since we are discussing master-slave replication, the only client writing to slave cluster is the replicator itself.
In this case ProduceRequest failure is some kind of replication logic error - for example when two replication instances are somehow launched for single partition.
The best option here is just to stop replication process.

So the answer to your question is (3), but this scenario should never happen.


> 
> On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada <
> aneporada@yandex-team.ru> wrote:
> 
>> Hi!
>> 
>>> On 27 Dec 2016, at 19:35, radai <ra...@gmail.com> wrote:
>>> 
>>> IIUC if you replicate from a single source cluster to a single target
>>> cluster, the topic has the same number of partitions on both, and no one
>>> writes directly to the target cluster (so master --> slave) the offsets
>>> would be preserved.
>>> 
>> 
>> Yes, exactly. When you
>> 1) create topic with the same number of partitions on both master and
>> slave clusters
>> 2) write only to master
>> 3) replicate partition to partition from master to slave
>> - in this case the offsets will be preserved.
>> 
>> However, you usually already have cluster that works and want to replicate
>> some topics to another one.
>> IMHO, in this scenario there should be a way to make message offsets equal
>> on both clusters.
>> 
>>> but in the general case - how would you handle the case where multiple
>>> producers "claim" the same offset ?
>> 
>> The same way as Kafka handles concurrent produce requests for the same
>> partition - produce requests for partition are serialized.
>> If the next produce request “overlaps” with previous one, it fails.
>> 
>>> 
>>> 
>>> On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada <
>>> aneporada@yandex-team.ru> wrote:
>>> 
>>>> Hi all!
>>>> 
>>>> Suppose you have two Kafka clusters and want to replicate topics from
>>>> primary cluster to secondary one.
>>>> It would be very convenient for readers if the message offsets for
>>>> replicated topics would be the same as for primary topics.
>>>> 
>>>> As far as I know, currently there is no way to achieve this.
>>>> I wonder is it possible/reasonable to add message offset to
>> ProduceRequest?
>>>> 
>>>> 
>>>> —
>>>> Andrey Neporada
>>>> 
>>>> 
>>>> 
>>>> 
>> 
>> 


Re: custom offsets in ProduceRequest

Posted by radai <ra...@gmail.com>.
so, if i follow your suggested logic correctly, there would be some sort of
:

produce(partition, msg, requestedOffset)

which would fail if requestedOffset is already taken (by another previous
such explicit call or by another regular call that just happened to get
assigned that offset by the partition leader on the target cluster).

how would you meaningfully handle this failure?

suppose this happens to some cross-cluster replicator (like mirror maker).
there is no use in retrying. the options would be:

1. get the next available offset - which would violate what youre trying to
achieve
2. skip msgs - so replication is incomplete, any offset "already taken" on
the destination is not replicated from source
3. stop replication for this partition completely - because starting from
now _ALL_ offsets will be taken - 1 foreign msg ruins everything for the
entire partition.

none of these options look good to me.



On Thu, Dec 29, 2016 at 3:22 AM, Andrey L. Neporada <
aneporada@yandex-team.ru> wrote:

> Hi!
>
> > On 27 Dec 2016, at 19:35, radai <ra...@gmail.com> wrote:
> >
> > IIUC if you replicate from a single source cluster to a single target
> > cluster, the topic has the same number of partitions on both, and no one
> > writes directly to the target cluster (so master --> slave) the offsets
> > would be preserved.
> >
>
> Yes, exactly. When you
> 1) create topic with the same number of partitions on both master and
> slave clusters
> 2) write only to master
> 3) replicate partition to partition from master to slave
> - in this case the offsets will be preserved.
>
> However, you usually already have cluster that works and want to replicate
> some topics to another one.
> IMHO, in this scenario there should be a way to make message offsets equal
> on both clusters.
>
> > but in the general case - how would you handle the case where multiple
> > producers "claim" the same offset ?
>
> The same way as Kafka handles concurrent produce requests for the same
> partition - produce requests for partition are serialized.
> If the next produce request “overlaps” with previous one, it fails.
>
> >
> >
> > On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada <
> > aneporada@yandex-team.ru> wrote:
> >
> >> Hi all!
> >>
> >> Suppose you have two Kafka clusters and want to replicate topics from
> >> primary cluster to secondary one.
> >> It would be very convenient for readers if the message offsets for
> >> replicated topics would be the same as for primary topics.
> >>
> >> As far as I know, currently there is no way to achieve this.
> >> I wonder is it possible/reasonable to add message offset to
> ProduceRequest?
> >>
> >>
> >> —
> >> Andrey Neporada
> >>
> >>
> >>
> >>
>
>

Re: custom offsets in ProduceRequest

Posted by "Andrey L. Neporada" <an...@yandex-team.ru>.
Hi!

> On 27 Dec 2016, at 19:35, radai <ra...@gmail.com> wrote:
> 
> IIUC if you replicate from a single source cluster to a single target
> cluster, the topic has the same number of partitions on both, and no one
> writes directly to the target cluster (so master --> slave) the offsets
> would be preserved.
> 

Yes, exactly. When you
1) create topic with the same number of partitions on both master and slave clusters
2) write only to master
3) replicate partition to partition from master to slave
- in this case the offsets will be preserved.

However, you usually already have cluster that works and want to replicate some topics to another one.
IMHO, in this scenario there should be a way to make message offsets equal on both clusters.

> but in the general case - how would you handle the case where multiple
> producers "claim" the same offset ?

The same way as Kafka handles concurrent produce requests for the same partition - produce requests for partition are serialized.
If the next produce request “overlaps” with previous one, it fails.

> 
> 
> On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada <
> aneporada@yandex-team.ru> wrote:
> 
>> Hi all!
>> 
>> Suppose you have two Kafka clusters and want to replicate topics from
>> primary cluster to secondary one.
>> It would be very convenient for readers if the message offsets for
>> replicated topics would be the same as for primary topics.
>> 
>> As far as I know, currently there is no way to achieve this.
>> I wonder is it possible/reasonable to add message offset to ProduceRequest?
>> 
>> 
>> —
>> Andrey Neporada
>> 
>> 
>> 
>> 


Re: custom offsets in ProduceRequest

Posted by radai <ra...@gmail.com>.
IIUC if you replicate from a single source cluster to a single target
cluster, the topic has the same number of partitions on both, and no one
writes directly to the target cluster (so master --> slave) the offsets
would be preserved.

but in the general case - how would you handle the case where multiple
producers "claim" the same offset ?


On Mon, Dec 26, 2016 at 4:52 AM, Andrey L. Neporada <
aneporada@yandex-team.ru> wrote:

> Hi all!
>
> Suppose you have two Kafka clusters and want to replicate topics from
> primary cluster to secondary one.
> It would be very convenient for readers if the message offsets for
> replicated topics would be the same as for primary topics.
>
> As far as I know, currently there is no way to achieve this.
> I wonder is it possible/reasonable to add message offset to ProduceRequest?
>
>
> —
> Andrey Neporada
>
>
>
>