You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Felix GV <fe...@mate1inc.com> on 2012/04/26 19:27:11 UTC

Replication questions

Cool :) Thanks for those insights :) !

I changed the subject of the thread, in order not to derail the original
thread's subject...! I just want to recap to make sure I (and others)
understand all of this correctly :)

So, if I understand correctly, with acks == [0,1] Kafka should provide a
latency that is similar to what we have now, but with the possibility of
losing a small window of unreplicated events in the case of an
unrecoverable hardware failure, and with acks > 1 (or acks == -1) there
will probably be a latency penalty but we will be completely protected from
(non-correlated) hardware failures, right?

Also, I guess the above assumptions are correct for a batch size of 1, and
that bigger batch sizes could also lead to small windows of unwritten data
in cases of failures, just like now...? Although, now that I think of it, I
guess the vulnerability of bigger batch sizes would, again, only come into
play in scenarios of unrecoverable correlated failures, since even if a
machine fails with some partially committed batch, there would be other
machines who received the same data (through replication) and would have
enough time to commit those batches...

Finally, I guess that replication (whatever the ack parameter is) will
affect the overall throughput capacity of the Kafka cluster, since every
node will now be writing its own data as well as the replicated data from
+/- 2 other nodes, right?

--
Felix



On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <ja...@gmail.com> wrote:

> Short answer is yes, both async (acks=0 or 1) and sync replication
> (acks > 1) will be both be supported.
>
> -Jay
>
> On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <ju...@gmail.com> wrote:
> > Felix,
> >
> > Initially, we thought we could keep the option of not sending acks from
> the
> > broker to the producer. However, this seems hard since in the new wire
> > protocol, we need to send at least the error code to the producer (e.g.,
> a
> > request is sent to the wrong broker or wrong partition).
> >
> > So, what we allow in the current design is the following. The producer
> can
> > specify the # of acks in the request. By default (acks = -1), the broker
> > will wait for the message to be written to all replicas that are still
> > synced up with the leader before acking the producer. Otherwise (acks
> >=0),
> > the broker will ack the producer after the message is written to acks
> > replicas. Currently, acks=0 is treated the same as acks=1.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <fe...@mate1inc.com> wrote:
> >
> >> Just curious, but if I remember correctly from the time I read KAFKA-50
> and
> >> the related JIRA issues, you guys plan to implement sync AND async
> >> replication, right?
> >>
> >> --
> >> Felix
> >>
> >>
> >>
> >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <ja...@gmail.com> wrote:
> >>
> >> > Right now we do sloppy failover. That is when a broker goes down
> >> > traffic is redirected to the remaining machines, but any unconsumed
> >> > messages are stuck on that server until it comes back, if it is
> >> > permanently gone the messages are lost. This is acceptable for us in
> >> > the near-term since our pipeline is pretty real-time so this window
> >> > between production and consumption is pretty small. The complete
> >> > solution is the intra-cluster replication in KAFA-50 which is coming
> >> > along fairly nicely now that we are working on it.
> >> >
> >> > -Jay
> >> >
> >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
> >> > <ol...@googlemail.com> wrote:
> >> > > Hi,
> >> > >
> >> > > indeed I thought could be used as failover approach.
> >> > >
> >> > > We use raid for local redundancy but it does not protect us in case
> of
> >> a
> >> > machine failure, so I am looking for a way to achieve a master/slave
> >> setup
> >> > until KAFKA-50 has been implemented.
> >> > >
> >> > > I think we can solve it for now by having multiple broker so that
> the
> >> > application can continue sending messages if one broker goes down. My
> >> main
> >> > concern is to not introduce a new single point of failure which can
> stop
> >> > the application. However as some consumer are not developed by us and
> it
> >> is
> >> > not clear how they store the offset in zookeeper we need to find out
> how
> >> we
> >> > can manage the consumer in case a broker will never return after a
> >> failure.
> >> > It will be acceptable to lose a couple of messages if a broker dies
> and
> >> the
> >> > consumers have not consumed all messages at the point of failure.
> >> > >
> >> > > Thanks,
> >> > > Oliver
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
> >> > >
> >> > >> I think the confusion comes from the fact that we are using
> mirroring
> >> to
> >> > >> handle geographic distribution not failover. If I understand
> correctly
> >> > what
> >> > >> Oliver is asking for is something to give fault tolerance not
> >> something
> >> > for
> >> > >> distribution. I don't think that is really what the mirroring does
> out
> >> > of
> >> > >> the box, though technically i suppose you could just reset the
> offsets
> >> > and
> >> > >> point the consumer at the new cluster and have it start from "now".
> >> > >>
> >> > >> I think it would be helpful to document our use case in the
> mirroring
> >> > docs
> >> > >> since this is not the first time someone has asked about this.
> >> > >>
> >> > >> -Jay
> >> > >>
> >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <jj...@gmail.com>
> >> > wrote:
> >> > >>
> >> > >>> Hi Oliver,
> >> > >>>
> >> > >>> I was reading the mirroring guide and I wonder if it is required
> that
> >> > the
> >> > >>>> mirror runs it's own zookeeper?
> >> > >>>>
> >> > >>>> We have a zookeeper cluster running which is used by different
> >> > >>>> applications, so can we use that zookeeper cluster for the kafka
> >> > source
> >> > >>> and
> >> > >>>> kafka mirror?
> >> > >>>>
> >> > >>>
> >> > >>> You could have a single zookeeper cluster and use different
> >> namespaces
> >> > for
> >> > >>> the source/target mirror. However, I don't think it is
> recommended to
> >> > use a
> >> > >>> remote zookeeper (if you have a cross-DC set up) since that would
> >> > >>> potentially mean very high ZK latencies on one of your clusters.
> >> > >>>
> >> > >>>
> >> > >>>> What is the procedure if the kafka source server fails to switch
> the
> >> > >>>> applications to use the mirrored instance?
> >> > >>>>
> >> > >>>
> >> > >>> I don't quite follow this question - can you clarify? The mirror
> >> > cluster is
> >> > >>> pretty much a separate instance. There is no built-in automatic
> >> > fail-over
> >> > >>> if your source cluster goes down.
> >> > >>>
> >> > >>>
> >> > >>>> Are there any backup best practices if we would not use
> mirroring?
> >> > >>>>
> >> > >>>
> >> > >>> You can use RAID arrays for (local) data redundancy. You may also
> be
> >> > >>> interested in the (intra-DC) replication feature (KAFKA-50) that
> is
> >> > >>> currently being developed. I believe some folks on this list have
> >> also
> >> > used
> >> > >>> plain rsync's as an alternative to mirroring.
> >> > >>>
> >> > >>> Thanks,
> >> > >>>
> >> > >>> Joel
> >> > >>>
> >> > >
> >> >
> >>
>

Re: Replication questions

Posted by Jay Kreps <ja...@gmail.com>.
It is what we are working on right now, it will be in the next release.

-Jay

On Fri, May 4, 2012 at 3:32 PM, Ashutosh Singh <as...@gmail.com> wrote:
> Is this replication an existing functionality or the new stuff that is
> planned to come?
>
> On Tue, May 1, 2012 at 11:47 AM, Felix GV <fe...@mate1inc.com> wrote:
>
>> Ah, gotcha, so my usage of the term "in-memory replication" can be
>> misleading: Kafka still doesn't retain the data in-app (i.e.: in Kafka's
>> allocated memory), but the data is in-memory nonetheless because of the OS'
>> file system cache.
>>
>> Basically, on the individual node's level, this is not different from what
>> we already have (without KAFKA-50), but the fact that KAFKA-50 will give us
>> replication means that the data will reside in the OS' file system cache of
>> many nodes, giving us much more reliable durability guarantees.
>>
>> Thanks for the nitty gritty details Jay :)
>>
>> --
>> Felix
>>
>>
>>
>> On Tue, May 1, 2012 at 1:51 PM, Jay Kreps <ja...@gmail.com> wrote:
>>
>> > Yes, that is correct. Technically we always immediately write to the
>> > filesystem, it is just a question of when you fsync the file (that is
>> > the slow thing). So though it is in memory it is not in application
>> > memory, so it always survives kill -9 but not unplugging the machine.
>> > Currently when a broker fails messages that are flushed to disk come
>> > back if the broker comes back with an intact filesystem (if the broker
>> > fs is destroyed then it is lost). With replication we retain this same
>> > flexibility on the flush policy, so you can flush every message to
>> > disk immediately if you like, however having the message on multiple
>> > machines is in some ways better durability then the fsync gives, as
>> > the message will survive destruction of the filesystem, so we think
>> > you can legitimately allow consumers to consume messages independent
>> > of the flush policy.
>> >
>> > Also when a broker fails it will lose unflushed messages, however when
>> > it comes back to life it will restore these messages from the other
>> > replicas before it will serve data to consumers. So the log will be
>> > byte-for-byte identical across all servers including both the contents
>> > and the ordering of messages.
>> >
>> > -Jay
>> >
>> > On Tue, May 1, 2012 at 9:24 AM, Felix GV <fe...@mate1inc.com> wrote:
>> > > Hmm... interesting!
>> > >
>> > > So, if I understanding correctly, what you're saying regarding point 2,
>> > is
>> > > that the messages are going to be kept in memory on several nodes, and
>> > > start being served to consumers as soon as this is completed, rather
>> than
>> > > after the data is flushed to disk? This way, we still benefit from the
>> > > throughput gain of flushing data to disk in batches, but we consider
>> that
>> > > the added durability of having in-memory replication is good enough to
>> > > start serving that data to consumers sooner.
>> > >
>> > > Furthermore, this means that in the unlikely event that several nodes
>> > would
>> > > fail simultaneously (a correlated failure), the data that is replicated
>> > to
>> > > the failed nodes but not yet flushed on any of them would be lost.
>> > However,
>> > > when a single node crashes and is then restarted, only the failed node
>> > will
>> > > have lost its unflushed data, while the other nodes that had replicated
>> > > that data will have had the opportunity to flush it to disk later on.
>> > >
>> > > Sorry if I'm repeating like a parrot. I just want to make sure I
>> > understand
>> > > correctly :)
>> > >
>> > > Please correct me if I'm not interpreting this correctly!
>> > >
>> > > --
>> > > Felix
>> > >
>> > >
>> > >
>> > > On Mon, Apr 30, 2012 at 5:59 PM, Jay Kreps <ja...@gmail.com>
>> wrote:
>> > >
>> > >> Yes, it is also worth noting that there are couple of different ways
>> > >> to think about latency:
>> > >> 1. latency of the request from the producer's point-of-view
>> > >> 2. end-to-end latency to the consumer
>> > >>
>> > >> As Jun mentions (1) may go up a little because the producer was
>> > >> sending data without checking for any answer from the server. Although
>> > >> this gives a nice buffering effect it leads to a number of corner
>> > >> cases that are hard to deal with correctly. It should be the case that
>> > >> setting the consumer to async has the same effect from the producer
>> > >> point of view without the corner cases of having no RPC response to
>> > >> convey errors and other broker misbehavior.
>> > >>
>> > >> (2) May actually get significantly better, especially for lower volume
>> > >> topics. The reason for this is because currently we wait until data is
>> > >> flushed to disk before giving it to the consumer, this flush policy is
>> > >> controlled by setting a number of messages or timeout at which the
>> > >> flush is forced. The reason to configure this is because on
>> > >> traditional disks each disk is likely to incur at least one seek. In
>> > >> the new model replication can take the place of waiting on a disk
>> > >> flush to provide durability (even if the log of the local server loses
>> > >> unflushed data as long as all servers don't crash at the same time no
>> > >> messages will be lost). Doing 2 parallel replication round-trips
>> > >> (perhaps surprisingly) looks like it may be a lot lower-latency than
>> > >> doing a local disk flush (< 1ms versus >= 10ms). In our own usage
>> > >> desire for this kind of low-latency consumption is not common, but I
>> > >> understand that this is a common need for messaging.
>> > >>
>> > >> -Jay
>> > >>
>> > >> On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <fe...@mate1inc.com> wrote:
>> > >> > Thanks Jun :)
>> > >> >
>> > >> > --
>> > >> > Felix
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <ju...@gmail.com> wrote:
>> > >> >
>> > >> >> Some comments inlined below.
>> > >> >>
>> > >> >> Thanks,
>> > >> >>
>> > >> >> Jun
>> > >> >>
>> > >> >> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com>
>> > wrote:
>> > >> >>
>> > >> >> > Cool :) Thanks for those insights :) !
>> > >> >> >
>> > >> >> > I changed the subject of the thread, in order not to derail the
>> > >> original
>> > >> >> > thread's subject...! I just want to recap to make sure I (and
>> > others)
>> > >> >> > understand all of this correctly :)
>> > >> >> >
>> > >> >> > So, if I understand correctly, with acks == [0,1] Kafka should
>> > >> provide a
>> > >> >> > latency that is similar to what we have now, but with the
>> > possibility
>> > >> of
>> > >> >> > losing a small window of unreplicated events in the case of an
>> > >> >> > unrecoverable hardware failure, and with acks > 1 (or acks == -1)
>> > >> there
>> > >> >> > will probably be a latency penalty but we will be completely
>> > protected
>> > >> >> from
>> > >> >> > (non-correlated) hardware failures, right?
>> > >> >> >
>> > >> >> > This is mostly true. The difference is that in 0.7, producer
>> > doesn't
>> > >> wait
>> > >> >> for a TCP response from broker. In 0.8, the producer always waits
>> > for a
>> > >> >> response from broker. How quickly the broker sends the response
>> > depends
>> > >> on
>> > >> >> acks. If acks is less than ideal, you may get the response faster,
>> > but
>> > >> have
>> > >> >> some risk of losing the data if there is broker failure.
>> > >> >>
>> > >> >>
>> > >> >> > Also, I guess the above assumptions are correct for a batch size
>> > of 1,
>> > >> >> and
>> > >> >> > that bigger batch sizes could also lead to small windows of
>> > unwritten
>> > >> >> data
>> > >> >> > in cases of failures, just like now...? Although, now that I
>> think
>> > of
>> > >> >> it, I
>> > >> >> > guess the vulnerability of bigger batch sizes would, again, only
>> > come
>> > >> >> into
>> > >> >> > play in scenarios of unrecoverable correlated failures, since
>> even
>> > if
>> > >> a
>> > >> >> > machine fails with some partially committed batch, there would be
>> > >> other
>> > >> >> > machines who received the same data (through replication) and
>> would
>> > >> have
>> > >> >> > enough time to commit those batches...
>> > >> >> >
>> > >> >> > I want to add that if the producer itself dies, it could lose a
>> > batch
>> > >> of
>> > >> >> events.
>> > >> >>
>> > >> >>
>> > >> >> > Finally, I guess that replication (whatever the ack parameter is)
>> > will
>> > >> >> > affect the overall throughput capacity of the Kafka cluster,
>> since
>> > >> every
>> > >> >> > node will now be writing its own data as well as the replicated
>> > data
>> > >> from
>> > >> >> > +/- 2 other nodes, right?
>> > >> >> >
>> > >> >> > --
>> > >> >> > Felix
>> > >> >> >
>> > >> >> >
>> > >> >> >
>> > >> >> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <ja...@gmail.com>
>> > >> wrote:
>> > >> >> >
>> > >> >> > > Short answer is yes, both async (acks=0 or 1) and sync
>> > replication
>> > >> >> > > (acks > 1) will be both be supported.
>> > >> >> > >
>> > >> >> > > -Jay
>> > >> >> > >
>> > >> >> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <ju...@gmail.com>
>> > wrote:
>> > >> >> > > > Felix,
>> > >> >> > > >
>> > >> >> > > > Initially, we thought we could keep the option of not sending
>> > acks
>> > >> >> from
>> > >> >> > > the
>> > >> >> > > > broker to the producer. However, this seems hard since in the
>> > new
>> > >> >> wire
>> > >> >> > > > protocol, we need to send at least the error code to the
>> > producer
>> > >> >> > (e.g.,
>> > >> >> > > a
>> > >> >> > > > request is sent to the wrong broker or wrong partition).
>> > >> >> > > >
>> > >> >> > > > So, what we allow in the current design is the following. The
>> > >> >> producer
>> > >> >> > > can
>> > >> >> > > > specify the # of acks in the request. By default (acks = -1),
>> > the
>> > >> >> > broker
>> > >> >> > > > will wait for the message to be written to all replicas that
>> > are
>> > >> >> still
>> > >> >> > > > synced up with the leader before acking the producer.
>> Otherwise
>> > >> (acks
>> > >> >> > > >=0),
>> > >> >> > > > the broker will ack the producer after the message is written
>> > to
>> > >> acks
>> > >> >> > > > replicas. Currently, acks=0 is treated the same as acks=1.
>> > >> >> > > >
>> > >> >> > > > Thanks,
>> > >> >> > > >
>> > >> >> > > > Jun
>> > >> >> > > >
>> > >> >> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <
>> felix@mate1inc.com
>> > >
>> > >> >> wrote:
>> > >> >> > > >
>> > >> >> > > >> Just curious, but if I remember correctly from the time I
>> read
>> > >> >> > KAFKA-50
>> > >> >> > > and
>> > >> >> > > >> the related JIRA issues, you guys plan to implement sync AND
>> > >> async
>> > >> >> > > >> replication, right?
>> > >> >> > > >>
>> > >> >> > > >> --
>> > >> >> > > >> Felix
>> > >> >> > > >>
>> > >> >> > > >>
>> > >> >> > > >>
>> > >> >> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <
>> > jay.kreps@gmail.com>
>> > >> >> > wrote:
>> > >> >> > > >>
>> > >> >> > > >> > Right now we do sloppy failover. That is when a broker
>> goes
>> > >> down
>> > >> >> > > >> > traffic is redirected to the remaining machines, but any
>> > >> >> unconsumed
>> > >> >> > > >> > messages are stuck on that server until it comes back, if
>> > it is
>> > >> >> > > >> > permanently gone the messages are lost. This is acceptable
>> > for
>> > >> us
>> > >> >> in
>> > >> >> > > >> > the near-term since our pipeline is pretty real-time so
>> this
>> > >> >> window
>> > >> >> > > >> > between production and consumption is pretty small. The
>> > >> complete
>> > >> >> > > >> > solution is the intra-cluster replication in KAFA-50 which
>> > is
>> > >> >> coming
>> > >> >> > > >> > along fairly nicely now that we are working on it.
>> > >> >> > > >> >
>> > >> >> > > >> > -Jay
>> > >> >> > > >> >
>> > >> >> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
>> > >> >> > > >> > <ol...@googlemail.com> wrote:
>> > >> >> > > >> > > Hi,
>> > >> >> > > >> > >
>> > >> >> > > >> > > indeed I thought could be used as failover approach.
>> > >> >> > > >> > >
>> > >> >> > > >> > > We use raid for local redundancy but it does not protect
>> > us
>> > >> in
>> > >> >> > case
>> > >> >> > > of
>> > >> >> > > >> a
>> > >> >> > > >> > machine failure, so I am looking for a way to achieve a
>> > >> >> master/slave
>> > >> >> > > >> setup
>> > >> >> > > >> > until KAFKA-50 has been implemented.
>> > >> >> > > >> > >
>> > >> >> > > >> > > I think we can solve it for now by having multiple
>> broker
>> > so
>> > >> >> that
>> > >> >> > > the
>> > >> >> > > >> > application can continue sending messages if one broker
>> goes
>> > >> down.
>> > >> >> > My
>> > >> >> > > >> main
>> > >> >> > > >> > concern is to not introduce a new single point of failure
>> > which
>> > >> >> can
>> > >> >> > > stop
>> > >> >> > > >> > the application. However as some consumer are not
>> developed
>> > by
>> > >> us
>> > >> >> > and
>> > >> >> > > it
>> > >> >> > > >> is
>> > >> >> > > >> > not clear how they store the offset in zookeeper we need
>> to
>> > >> find
>> > >> >> out
>> > >> >> > > how
>> > >> >> > > >> we
>> > >> >> > > >> > can manage the consumer in case a broker will never return
>> > >> after a
>> > >> >> > > >> failure.
>> > >> >> > > >> > It will be acceptable to lose a couple of messages if a
>> > broker
>> > >> >> dies
>> > >> >> > > and
>> > >> >> > > >> the
>> > >> >> > > >> > consumers have not consumed all messages at the point of
>> > >> failure.
>> > >> >> > > >> > >
>> > >> >> > > >> > > Thanks,
>> > >> >> > > >> > > Oliver
>> > >> >> > > >> > >
>> > >> >> > > >> > >
>> > >> >> > > >> > >
>> > >> >> > > >> > >
>> > >> >> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
>> > >> >> > > >> > >
>> > >> >> > > >> > >> I think the confusion comes from the fact that we are
>> > using
>> > >> >> > > mirroring
>> > >> >> > > >> to
>> > >> >> > > >> > >> handle geographic distribution not failover. If I
>> > understand
>> > >> >> > > correctly
>> > >> >> > > >> > what
>> > >> >> > > >> > >> Oliver is asking for is something to give fault
>> tolerance
>> > >> not
>> > >> >> > > >> something
>> > >> >> > > >> > for
>> > >> >> > > >> > >> distribution. I don't think that is really what the
>> > >> mirroring
>> > >> >> > does
>> > >> >> > > out
>> > >> >> > > >> > of
>> > >> >> > > >> > >> the box, though technically i suppose you could just
>> > reset
>> > >> the
>> > >> >> > > offsets
>> > >> >> > > >> > and
>> > >> >> > > >> > >> point the consumer at the new cluster and have it start
>> > from
>> > >> >> > "now".
>> > >> >> > > >> > >>
>> > >> >> > > >> > >> I think it would be helpful to document our use case in
>> > the
>> > >> >> > > mirroring
>> > >> >> > > >> > docs
>> > >> >> > > >> > >> since this is not the first time someone has asked
>> about
>> > >> this.
>> > >> >> > > >> > >>
>> > >> >> > > >> > >> -Jay
>> > >> >> > > >> > >>
>> > >> >> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <
>> > >> >> > jjkoshy.w@gmail.com>
>> > >> >> > > >> > wrote:
>> > >> >> > > >> > >>
>> > >> >> > > >> > >>> Hi Oliver,
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> I was reading the mirroring guide and I wonder if it
>> is
>> > >> >> required
>> > >> >> > > that
>> > >> >> > > >> > the
>> > >> >> > > >> > >>>> mirror runs it's own zookeeper?
>> > >> >> > > >> > >>>>
>> > >> >> > > >> > >>>> We have a zookeeper cluster running which is used by
>> > >> >> different
>> > >> >> > > >> > >>>> applications, so can we use that zookeeper cluster
>> for
>> > the
>> > >> >> > kafka
>> > >> >> > > >> > source
>> > >> >> > > >> > >>> and
>> > >> >> > > >> > >>>> kafka mirror?
>> > >> >> > > >> > >>>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> You could have a single zookeeper cluster and use
>> > different
>> > >> >> > > >> namespaces
>> > >> >> > > >> > for
>> > >> >> > > >> > >>> the source/target mirror. However, I don't think it is
>> > >> >> > > recommended to
>> > >> >> > > >> > use a
>> > >> >> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since
>> > that
>> > >> >> > would
>> > >> >> > > >> > >>> potentially mean very high ZK latencies on one of your
>> > >> >> clusters.
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>>> What is the procedure if the kafka source server
>> fails
>> > to
>> > >> >> > switch
>> > >> >> > > the
>> > >> >> > > >> > >>>> applications to use the mirrored instance?
>> > >> >> > > >> > >>>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> I don't quite follow this question - can you clarify?
>> > The
>> > >> >> mirror
>> > >> >> > > >> > cluster is
>> > >> >> > > >> > >>> pretty much a separate instance. There is no built-in
>> > >> >> automatic
>> > >> >> > > >> > fail-over
>> > >> >> > > >> > >>> if your source cluster goes down.
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>>> Are there any backup best practices if we would not
>> use
>> > >> >> > > mirroring?
>> > >> >> > > >> > >>>>
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> You can use RAID arrays for (local) data redundancy.
>> You
>> > >> may
>> > >> >> > also
>> > >> >> > > be
>> > >> >> > > >> > >>> interested in the (intra-DC) replication feature
>> > (KAFKA-50)
>> > >> >> that
>> > >> >> > > is
>> > >> >> > > >> > >>> currently being developed. I believe some folks on
>> this
>> > >> list
>> > >> >> > have
>> > >> >> > > >> also
>> > >> >> > > >> > used
>> > >> >> > > >> > >>> plain rsync's as an alternative to mirroring.
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> Thanks,
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >>> Joel
>> > >> >> > > >> > >>>
>> > >> >> > > >> > >
>> > >> >> > > >> >
>> > >> >> > > >>
>> > >> >> > >
>> > >> >> >
>> > >> >>
>> > >>
>> >
>>

Re: Replication questions

Posted by Ashutosh Singh <as...@gmail.com>.
Is this replication an existing functionality or the new stuff that is
planned to come?

On Tue, May 1, 2012 at 11:47 AM, Felix GV <fe...@mate1inc.com> wrote:

> Ah, gotcha, so my usage of the term "in-memory replication" can be
> misleading: Kafka still doesn't retain the data in-app (i.e.: in Kafka's
> allocated memory), but the data is in-memory nonetheless because of the OS'
> file system cache.
>
> Basically, on the individual node's level, this is not different from what
> we already have (without KAFKA-50), but the fact that KAFKA-50 will give us
> replication means that the data will reside in the OS' file system cache of
> many nodes, giving us much more reliable durability guarantees.
>
> Thanks for the nitty gritty details Jay :)
>
> --
> Felix
>
>
>
> On Tue, May 1, 2012 at 1:51 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Yes, that is correct. Technically we always immediately write to the
> > filesystem, it is just a question of when you fsync the file (that is
> > the slow thing). So though it is in memory it is not in application
> > memory, so it always survives kill -9 but not unplugging the machine.
> > Currently when a broker fails messages that are flushed to disk come
> > back if the broker comes back with an intact filesystem (if the broker
> > fs is destroyed then it is lost). With replication we retain this same
> > flexibility on the flush policy, so you can flush every message to
> > disk immediately if you like, however having the message on multiple
> > machines is in some ways better durability then the fsync gives, as
> > the message will survive destruction of the filesystem, so we think
> > you can legitimately allow consumers to consume messages independent
> > of the flush policy.
> >
> > Also when a broker fails it will lose unflushed messages, however when
> > it comes back to life it will restore these messages from the other
> > replicas before it will serve data to consumers. So the log will be
> > byte-for-byte identical across all servers including both the contents
> > and the ordering of messages.
> >
> > -Jay
> >
> > On Tue, May 1, 2012 at 9:24 AM, Felix GV <fe...@mate1inc.com> wrote:
> > > Hmm... interesting!
> > >
> > > So, if I understanding correctly, what you're saying regarding point 2,
> > is
> > > that the messages are going to be kept in memory on several nodes, and
> > > start being served to consumers as soon as this is completed, rather
> than
> > > after the data is flushed to disk? This way, we still benefit from the
> > > throughput gain of flushing data to disk in batches, but we consider
> that
> > > the added durability of having in-memory replication is good enough to
> > > start serving that data to consumers sooner.
> > >
> > > Furthermore, this means that in the unlikely event that several nodes
> > would
> > > fail simultaneously (a correlated failure), the data that is replicated
> > to
> > > the failed nodes but not yet flushed on any of them would be lost.
> > However,
> > > when a single node crashes and is then restarted, only the failed node
> > will
> > > have lost its unflushed data, while the other nodes that had replicated
> > > that data will have had the opportunity to flush it to disk later on.
> > >
> > > Sorry if I'm repeating like a parrot. I just want to make sure I
> > understand
> > > correctly :)
> > >
> > > Please correct me if I'm not interpreting this correctly!
> > >
> > > --
> > > Felix
> > >
> > >
> > >
> > > On Mon, Apr 30, 2012 at 5:59 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > >> Yes, it is also worth noting that there are couple of different ways
> > >> to think about latency:
> > >> 1. latency of the request from the producer's point-of-view
> > >> 2. end-to-end latency to the consumer
> > >>
> > >> As Jun mentions (1) may go up a little because the producer was
> > >> sending data without checking for any answer from the server. Although
> > >> this gives a nice buffering effect it leads to a number of corner
> > >> cases that are hard to deal with correctly. It should be the case that
> > >> setting the consumer to async has the same effect from the producer
> > >> point of view without the corner cases of having no RPC response to
> > >> convey errors and other broker misbehavior.
> > >>
> > >> (2) May actually get significantly better, especially for lower volume
> > >> topics. The reason for this is because currently we wait until data is
> > >> flushed to disk before giving it to the consumer, this flush policy is
> > >> controlled by setting a number of messages or timeout at which the
> > >> flush is forced. The reason to configure this is because on
> > >> traditional disks each disk is likely to incur at least one seek. In
> > >> the new model replication can take the place of waiting on a disk
> > >> flush to provide durability (even if the log of the local server loses
> > >> unflushed data as long as all servers don't crash at the same time no
> > >> messages will be lost). Doing 2 parallel replication round-trips
> > >> (perhaps surprisingly) looks like it may be a lot lower-latency than
> > >> doing a local disk flush (< 1ms versus >= 10ms). In our own usage
> > >> desire for this kind of low-latency consumption is not common, but I
> > >> understand that this is a common need for messaging.
> > >>
> > >> -Jay
> > >>
> > >> On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <fe...@mate1inc.com> wrote:
> > >> > Thanks Jun :)
> > >> >
> > >> > --
> > >> > Felix
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <ju...@gmail.com> wrote:
> > >> >
> > >> >> Some comments inlined below.
> > >> >>
> > >> >> Thanks,
> > >> >>
> > >> >> Jun
> > >> >>
> > >> >> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com>
> > wrote:
> > >> >>
> > >> >> > Cool :) Thanks for those insights :) !
> > >> >> >
> > >> >> > I changed the subject of the thread, in order not to derail the
> > >> original
> > >> >> > thread's subject...! I just want to recap to make sure I (and
> > others)
> > >> >> > understand all of this correctly :)
> > >> >> >
> > >> >> > So, if I understand correctly, with acks == [0,1] Kafka should
> > >> provide a
> > >> >> > latency that is similar to what we have now, but with the
> > possibility
> > >> of
> > >> >> > losing a small window of unreplicated events in the case of an
> > >> >> > unrecoverable hardware failure, and with acks > 1 (or acks == -1)
> > >> there
> > >> >> > will probably be a latency penalty but we will be completely
> > protected
> > >> >> from
> > >> >> > (non-correlated) hardware failures, right?
> > >> >> >
> > >> >> > This is mostly true. The difference is that in 0.7, producer
> > doesn't
> > >> wait
> > >> >> for a TCP response from broker. In 0.8, the producer always waits
> > for a
> > >> >> response from broker. How quickly the broker sends the response
> > depends
> > >> on
> > >> >> acks. If acks is less than ideal, you may get the response faster,
> > but
> > >> have
> > >> >> some risk of losing the data if there is broker failure.
> > >> >>
> > >> >>
> > >> >> > Also, I guess the above assumptions are correct for a batch size
> > of 1,
> > >> >> and
> > >> >> > that bigger batch sizes could also lead to small windows of
> > unwritten
> > >> >> data
> > >> >> > in cases of failures, just like now...? Although, now that I
> think
> > of
> > >> >> it, I
> > >> >> > guess the vulnerability of bigger batch sizes would, again, only
> > come
> > >> >> into
> > >> >> > play in scenarios of unrecoverable correlated failures, since
> even
> > if
> > >> a
> > >> >> > machine fails with some partially committed batch, there would be
> > >> other
> > >> >> > machines who received the same data (through replication) and
> would
> > >> have
> > >> >> > enough time to commit those batches...
> > >> >> >
> > >> >> > I want to add that if the producer itself dies, it could lose a
> > batch
> > >> of
> > >> >> events.
> > >> >>
> > >> >>
> > >> >> > Finally, I guess that replication (whatever the ack parameter is)
> > will
> > >> >> > affect the overall throughput capacity of the Kafka cluster,
> since
> > >> every
> > >> >> > node will now be writing its own data as well as the replicated
> > data
> > >> from
> > >> >> > +/- 2 other nodes, right?
> > >> >> >
> > >> >> > --
> > >> >> > Felix
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <ja...@gmail.com>
> > >> wrote:
> > >> >> >
> > >> >> > > Short answer is yes, both async (acks=0 or 1) and sync
> > replication
> > >> >> > > (acks > 1) will be both be supported.
> > >> >> > >
> > >> >> > > -Jay
> > >> >> > >
> > >> >> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <ju...@gmail.com>
> > wrote:
> > >> >> > > > Felix,
> > >> >> > > >
> > >> >> > > > Initially, we thought we could keep the option of not sending
> > acks
> > >> >> from
> > >> >> > > the
> > >> >> > > > broker to the producer. However, this seems hard since in the
> > new
> > >> >> wire
> > >> >> > > > protocol, we need to send at least the error code to the
> > producer
> > >> >> > (e.g.,
> > >> >> > > a
> > >> >> > > > request is sent to the wrong broker or wrong partition).
> > >> >> > > >
> > >> >> > > > So, what we allow in the current design is the following. The
> > >> >> producer
> > >> >> > > can
> > >> >> > > > specify the # of acks in the request. By default (acks = -1),
> > the
> > >> >> > broker
> > >> >> > > > will wait for the message to be written to all replicas that
> > are
> > >> >> still
> > >> >> > > > synced up with the leader before acking the producer.
> Otherwise
> > >> (acks
> > >> >> > > >=0),
> > >> >> > > > the broker will ack the producer after the message is written
> > to
> > >> acks
> > >> >> > > > replicas. Currently, acks=0 is treated the same as acks=1.
> > >> >> > > >
> > >> >> > > > Thanks,
> > >> >> > > >
> > >> >> > > > Jun
> > >> >> > > >
> > >> >> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <
> felix@mate1inc.com
> > >
> > >> >> wrote:
> > >> >> > > >
> > >> >> > > >> Just curious, but if I remember correctly from the time I
> read
> > >> >> > KAFKA-50
> > >> >> > > and
> > >> >> > > >> the related JIRA issues, you guys plan to implement sync AND
> > >> async
> > >> >> > > >> replication, right?
> > >> >> > > >>
> > >> >> > > >> --
> > >> >> > > >> Felix
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <
> > jay.kreps@gmail.com>
> > >> >> > wrote:
> > >> >> > > >>
> > >> >> > > >> > Right now we do sloppy failover. That is when a broker
> goes
> > >> down
> > >> >> > > >> > traffic is redirected to the remaining machines, but any
> > >> >> unconsumed
> > >> >> > > >> > messages are stuck on that server until it comes back, if
> > it is
> > >> >> > > >> > permanently gone the messages are lost. This is acceptable
> > for
> > >> us
> > >> >> in
> > >> >> > > >> > the near-term since our pipeline is pretty real-time so
> this
> > >> >> window
> > >> >> > > >> > between production and consumption is pretty small. The
> > >> complete
> > >> >> > > >> > solution is the intra-cluster replication in KAFA-50 which
> > is
> > >> >> coming
> > >> >> > > >> > along fairly nicely now that we are working on it.
> > >> >> > > >> >
> > >> >> > > >> > -Jay
> > >> >> > > >> >
> > >> >> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
> > >> >> > > >> > <ol...@googlemail.com> wrote:
> > >> >> > > >> > > Hi,
> > >> >> > > >> > >
> > >> >> > > >> > > indeed I thought could be used as failover approach.
> > >> >> > > >> > >
> > >> >> > > >> > > We use raid for local redundancy but it does not protect
> > us
> > >> in
> > >> >> > case
> > >> >> > > of
> > >> >> > > >> a
> > >> >> > > >> > machine failure, so I am looking for a way to achieve a
> > >> >> master/slave
> > >> >> > > >> setup
> > >> >> > > >> > until KAFKA-50 has been implemented.
> > >> >> > > >> > >
> > >> >> > > >> > > I think we can solve it for now by having multiple
> broker
> > so
> > >> >> that
> > >> >> > > the
> > >> >> > > >> > application can continue sending messages if one broker
> goes
> > >> down.
> > >> >> > My
> > >> >> > > >> main
> > >> >> > > >> > concern is to not introduce a new single point of failure
> > which
> > >> >> can
> > >> >> > > stop
> > >> >> > > >> > the application. However as some consumer are not
> developed
> > by
> > >> us
> > >> >> > and
> > >> >> > > it
> > >> >> > > >> is
> > >> >> > > >> > not clear how they store the offset in zookeeper we need
> to
> > >> find
> > >> >> out
> > >> >> > > how
> > >> >> > > >> we
> > >> >> > > >> > can manage the consumer in case a broker will never return
> > >> after a
> > >> >> > > >> failure.
> > >> >> > > >> > It will be acceptable to lose a couple of messages if a
> > broker
> > >> >> dies
> > >> >> > > and
> > >> >> > > >> the
> > >> >> > > >> > consumers have not consumed all messages at the point of
> > >> failure.
> > >> >> > > >> > >
> > >> >> > > >> > > Thanks,
> > >> >> > > >> > > Oliver
> > >> >> > > >> > >
> > >> >> > > >> > >
> > >> >> > > >> > >
> > >> >> > > >> > >
> > >> >> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
> > >> >> > > >> > >
> > >> >> > > >> > >> I think the confusion comes from the fact that we are
> > using
> > >> >> > > mirroring
> > >> >> > > >> to
> > >> >> > > >> > >> handle geographic distribution not failover. If I
> > understand
> > >> >> > > correctly
> > >> >> > > >> > what
> > >> >> > > >> > >> Oliver is asking for is something to give fault
> tolerance
> > >> not
> > >> >> > > >> something
> > >> >> > > >> > for
> > >> >> > > >> > >> distribution. I don't think that is really what the
> > >> mirroring
> > >> >> > does
> > >> >> > > out
> > >> >> > > >> > of
> > >> >> > > >> > >> the box, though technically i suppose you could just
> > reset
> > >> the
> > >> >> > > offsets
> > >> >> > > >> > and
> > >> >> > > >> > >> point the consumer at the new cluster and have it start
> > from
> > >> >> > "now".
> > >> >> > > >> > >>
> > >> >> > > >> > >> I think it would be helpful to document our use case in
> > the
> > >> >> > > mirroring
> > >> >> > > >> > docs
> > >> >> > > >> > >> since this is not the first time someone has asked
> about
> > >> this.
> > >> >> > > >> > >>
> > >> >> > > >> > >> -Jay
> > >> >> > > >> > >>
> > >> >> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <
> > >> >> > jjkoshy.w@gmail.com>
> > >> >> > > >> > wrote:
> > >> >> > > >> > >>
> > >> >> > > >> > >>> Hi Oliver,
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> I was reading the mirroring guide and I wonder if it
> is
> > >> >> required
> > >> >> > > that
> > >> >> > > >> > the
> > >> >> > > >> > >>>> mirror runs it's own zookeeper?
> > >> >> > > >> > >>>>
> > >> >> > > >> > >>>> We have a zookeeper cluster running which is used by
> > >> >> different
> > >> >> > > >> > >>>> applications, so can we use that zookeeper cluster
> for
> > the
> > >> >> > kafka
> > >> >> > > >> > source
> > >> >> > > >> > >>> and
> > >> >> > > >> > >>>> kafka mirror?
> > >> >> > > >> > >>>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> You could have a single zookeeper cluster and use
> > different
> > >> >> > > >> namespaces
> > >> >> > > >> > for
> > >> >> > > >> > >>> the source/target mirror. However, I don't think it is
> > >> >> > > recommended to
> > >> >> > > >> > use a
> > >> >> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since
> > that
> > >> >> > would
> > >> >> > > >> > >>> potentially mean very high ZK latencies on one of your
> > >> >> clusters.
> > >> >> > > >> > >>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>>> What is the procedure if the kafka source server
> fails
> > to
> > >> >> > switch
> > >> >> > > the
> > >> >> > > >> > >>>> applications to use the mirrored instance?
> > >> >> > > >> > >>>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> I don't quite follow this question - can you clarify?
> > The
> > >> >> mirror
> > >> >> > > >> > cluster is
> > >> >> > > >> > >>> pretty much a separate instance. There is no built-in
> > >> >> automatic
> > >> >> > > >> > fail-over
> > >> >> > > >> > >>> if your source cluster goes down.
> > >> >> > > >> > >>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>>> Are there any backup best practices if we would not
> use
> > >> >> > > mirroring?
> > >> >> > > >> > >>>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> You can use RAID arrays for (local) data redundancy.
> You
> > >> may
> > >> >> > also
> > >> >> > > be
> > >> >> > > >> > >>> interested in the (intra-DC) replication feature
> > (KAFKA-50)
> > >> >> that
> > >> >> > > is
> > >> >> > > >> > >>> currently being developed. I believe some folks on
> this
> > >> list
> > >> >> > have
> > >> >> > > >> also
> > >> >> > > >> > used
> > >> >> > > >> > >>> plain rsync's as an alternative to mirroring.
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> Thanks,
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> Joel
> > >> >> > > >> > >>>
> > >> >> > > >> > >
> > >> >> > > >> >
> > >> >> > > >>
> > >> >> > >
> > >> >> >
> > >> >>
> > >>
> >
>

Re: Replication questions

Posted by Felix GV <fe...@mate1inc.com>.
Ah, gotcha, so my usage of the term "in-memory replication" can be
misleading: Kafka still doesn't retain the data in-app (i.e.: in Kafka's
allocated memory), but the data is in-memory nonetheless because of the OS'
file system cache.

Basically, on the individual node's level, this is not different from what
we already have (without KAFKA-50), but the fact that KAFKA-50 will give us
replication means that the data will reside in the OS' file system cache of
many nodes, giving us much more reliable durability guarantees.

Thanks for the nitty gritty details Jay :)

--
Felix



On Tue, May 1, 2012 at 1:51 PM, Jay Kreps <ja...@gmail.com> wrote:

> Yes, that is correct. Technically we always immediately write to the
> filesystem, it is just a question of when you fsync the file (that is
> the slow thing). So though it is in memory it is not in application
> memory, so it always survives kill -9 but not unplugging the machine.
> Currently when a broker fails messages that are flushed to disk come
> back if the broker comes back with an intact filesystem (if the broker
> fs is destroyed then it is lost). With replication we retain this same
> flexibility on the flush policy, so you can flush every message to
> disk immediately if you like, however having the message on multiple
> machines is in some ways better durability then the fsync gives, as
> the message will survive destruction of the filesystem, so we think
> you can legitimately allow consumers to consume messages independent
> of the flush policy.
>
> Also when a broker fails it will lose unflushed messages, however when
> it comes back to life it will restore these messages from the other
> replicas before it will serve data to consumers. So the log will be
> byte-for-byte identical across all servers including both the contents
> and the ordering of messages.
>
> -Jay
>
> On Tue, May 1, 2012 at 9:24 AM, Felix GV <fe...@mate1inc.com> wrote:
> > Hmm... interesting!
> >
> > So, if I understanding correctly, what you're saying regarding point 2,
> is
> > that the messages are going to be kept in memory on several nodes, and
> > start being served to consumers as soon as this is completed, rather than
> > after the data is flushed to disk? This way, we still benefit from the
> > throughput gain of flushing data to disk in batches, but we consider that
> > the added durability of having in-memory replication is good enough to
> > start serving that data to consumers sooner.
> >
> > Furthermore, this means that in the unlikely event that several nodes
> would
> > fail simultaneously (a correlated failure), the data that is replicated
> to
> > the failed nodes but not yet flushed on any of them would be lost.
> However,
> > when a single node crashes and is then restarted, only the failed node
> will
> > have lost its unflushed data, while the other nodes that had replicated
> > that data will have had the opportunity to flush it to disk later on.
> >
> > Sorry if I'm repeating like a parrot. I just want to make sure I
> understand
> > correctly :)
> >
> > Please correct me if I'm not interpreting this correctly!
> >
> > --
> > Felix
> >
> >
> >
> > On Mon, Apr 30, 2012 at 5:59 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> >> Yes, it is also worth noting that there are couple of different ways
> >> to think about latency:
> >> 1. latency of the request from the producer's point-of-view
> >> 2. end-to-end latency to the consumer
> >>
> >> As Jun mentions (1) may go up a little because the producer was
> >> sending data without checking for any answer from the server. Although
> >> this gives a nice buffering effect it leads to a number of corner
> >> cases that are hard to deal with correctly. It should be the case that
> >> setting the consumer to async has the same effect from the producer
> >> point of view without the corner cases of having no RPC response to
> >> convey errors and other broker misbehavior.
> >>
> >> (2) May actually get significantly better, especially for lower volume
> >> topics. The reason for this is because currently we wait until data is
> >> flushed to disk before giving it to the consumer, this flush policy is
> >> controlled by setting a number of messages or timeout at which the
> >> flush is forced. The reason to configure this is because on
> >> traditional disks each disk is likely to incur at least one seek. In
> >> the new model replication can take the place of waiting on a disk
> >> flush to provide durability (even if the log of the local server loses
> >> unflushed data as long as all servers don't crash at the same time no
> >> messages will be lost). Doing 2 parallel replication round-trips
> >> (perhaps surprisingly) looks like it may be a lot lower-latency than
> >> doing a local disk flush (< 1ms versus >= 10ms). In our own usage
> >> desire for this kind of low-latency consumption is not common, but I
> >> understand that this is a common need for messaging.
> >>
> >> -Jay
> >>
> >> On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <fe...@mate1inc.com> wrote:
> >> > Thanks Jun :)
> >> >
> >> > --
> >> > Felix
> >> >
> >> >
> >> >
> >> > On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <ju...@gmail.com> wrote:
> >> >
> >> >> Some comments inlined below.
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Jun
> >> >>
> >> >> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com>
> wrote:
> >> >>
> >> >> > Cool :) Thanks for those insights :) !
> >> >> >
> >> >> > I changed the subject of the thread, in order not to derail the
> >> original
> >> >> > thread's subject...! I just want to recap to make sure I (and
> others)
> >> >> > understand all of this correctly :)
> >> >> >
> >> >> > So, if I understand correctly, with acks == [0,1] Kafka should
> >> provide a
> >> >> > latency that is similar to what we have now, but with the
> possibility
> >> of
> >> >> > losing a small window of unreplicated events in the case of an
> >> >> > unrecoverable hardware failure, and with acks > 1 (or acks == -1)
> >> there
> >> >> > will probably be a latency penalty but we will be completely
> protected
> >> >> from
> >> >> > (non-correlated) hardware failures, right?
> >> >> >
> >> >> > This is mostly true. The difference is that in 0.7, producer
> doesn't
> >> wait
> >> >> for a TCP response from broker. In 0.8, the producer always waits
> for a
> >> >> response from broker. How quickly the broker sends the response
> depends
> >> on
> >> >> acks. If acks is less than ideal, you may get the response faster,
> but
> >> have
> >> >> some risk of losing the data if there is broker failure.
> >> >>
> >> >>
> >> >> > Also, I guess the above assumptions are correct for a batch size
> of 1,
> >> >> and
> >> >> > that bigger batch sizes could also lead to small windows of
> unwritten
> >> >> data
> >> >> > in cases of failures, just like now...? Although, now that I think
> of
> >> >> it, I
> >> >> > guess the vulnerability of bigger batch sizes would, again, only
> come
> >> >> into
> >> >> > play in scenarios of unrecoverable correlated failures, since even
> if
> >> a
> >> >> > machine fails with some partially committed batch, there would be
> >> other
> >> >> > machines who received the same data (through replication) and would
> >> have
> >> >> > enough time to commit those batches...
> >> >> >
> >> >> > I want to add that if the producer itself dies, it could lose a
> batch
> >> of
> >> >> events.
> >> >>
> >> >>
> >> >> > Finally, I guess that replication (whatever the ack parameter is)
> will
> >> >> > affect the overall throughput capacity of the Kafka cluster, since
> >> every
> >> >> > node will now be writing its own data as well as the replicated
> data
> >> from
> >> >> > +/- 2 other nodes, right?
> >> >> >
> >> >> > --
> >> >> > Felix
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <ja...@gmail.com>
> >> wrote:
> >> >> >
> >> >> > > Short answer is yes, both async (acks=0 or 1) and sync
> replication
> >> >> > > (acks > 1) will be both be supported.
> >> >> > >
> >> >> > > -Jay
> >> >> > >
> >> >> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <ju...@gmail.com>
> wrote:
> >> >> > > > Felix,
> >> >> > > >
> >> >> > > > Initially, we thought we could keep the option of not sending
> acks
> >> >> from
> >> >> > > the
> >> >> > > > broker to the producer. However, this seems hard since in the
> new
> >> >> wire
> >> >> > > > protocol, we need to send at least the error code to the
> producer
> >> >> > (e.g.,
> >> >> > > a
> >> >> > > > request is sent to the wrong broker or wrong partition).
> >> >> > > >
> >> >> > > > So, what we allow in the current design is the following. The
> >> >> producer
> >> >> > > can
> >> >> > > > specify the # of acks in the request. By default (acks = -1),
> the
> >> >> > broker
> >> >> > > > will wait for the message to be written to all replicas that
> are
> >> >> still
> >> >> > > > synced up with the leader before acking the producer. Otherwise
> >> (acks
> >> >> > > >=0),
> >> >> > > > the broker will ack the producer after the message is written
> to
> >> acks
> >> >> > > > replicas. Currently, acks=0 is treated the same as acks=1.
> >> >> > > >
> >> >> > > > Thanks,
> >> >> > > >
> >> >> > > > Jun
> >> >> > > >
> >> >> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <felix@mate1inc.com
> >
> >> >> wrote:
> >> >> > > >
> >> >> > > >> Just curious, but if I remember correctly from the time I read
> >> >> > KAFKA-50
> >> >> > > and
> >> >> > > >> the related JIRA issues, you guys plan to implement sync AND
> >> async
> >> >> > > >> replication, right?
> >> >> > > >>
> >> >> > > >> --
> >> >> > > >> Felix
> >> >> > > >>
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <
> jay.kreps@gmail.com>
> >> >> > wrote:
> >> >> > > >>
> >> >> > > >> > Right now we do sloppy failover. That is when a broker goes
> >> down
> >> >> > > >> > traffic is redirected to the remaining machines, but any
> >> >> unconsumed
> >> >> > > >> > messages are stuck on that server until it comes back, if
> it is
> >> >> > > >> > permanently gone the messages are lost. This is acceptable
> for
> >> us
> >> >> in
> >> >> > > >> > the near-term since our pipeline is pretty real-time so this
> >> >> window
> >> >> > > >> > between production and consumption is pretty small. The
> >> complete
> >> >> > > >> > solution is the intra-cluster replication in KAFA-50 which
> is
> >> >> coming
> >> >> > > >> > along fairly nicely now that we are working on it.
> >> >> > > >> >
> >> >> > > >> > -Jay
> >> >> > > >> >
> >> >> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
> >> >> > > >> > <ol...@googlemail.com> wrote:
> >> >> > > >> > > Hi,
> >> >> > > >> > >
> >> >> > > >> > > indeed I thought could be used as failover approach.
> >> >> > > >> > >
> >> >> > > >> > > We use raid for local redundancy but it does not protect
> us
> >> in
> >> >> > case
> >> >> > > of
> >> >> > > >> a
> >> >> > > >> > machine failure, so I am looking for a way to achieve a
> >> >> master/slave
> >> >> > > >> setup
> >> >> > > >> > until KAFKA-50 has been implemented.
> >> >> > > >> > >
> >> >> > > >> > > I think we can solve it for now by having multiple broker
> so
> >> >> that
> >> >> > > the
> >> >> > > >> > application can continue sending messages if one broker goes
> >> down.
> >> >> > My
> >> >> > > >> main
> >> >> > > >> > concern is to not introduce a new single point of failure
> which
> >> >> can
> >> >> > > stop
> >> >> > > >> > the application. However as some consumer are not developed
> by
> >> us
> >> >> > and
> >> >> > > it
> >> >> > > >> is
> >> >> > > >> > not clear how they store the offset in zookeeper we need to
> >> find
> >> >> out
> >> >> > > how
> >> >> > > >> we
> >> >> > > >> > can manage the consumer in case a broker will never return
> >> after a
> >> >> > > >> failure.
> >> >> > > >> > It will be acceptable to lose a couple of messages if a
> broker
> >> >> dies
> >> >> > > and
> >> >> > > >> the
> >> >> > > >> > consumers have not consumed all messages at the point of
> >> failure.
> >> >> > > >> > >
> >> >> > > >> > > Thanks,
> >> >> > > >> > > Oliver
> >> >> > > >> > >
> >> >> > > >> > >
> >> >> > > >> > >
> >> >> > > >> > >
> >> >> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
> >> >> > > >> > >
> >> >> > > >> > >> I think the confusion comes from the fact that we are
> using
> >> >> > > mirroring
> >> >> > > >> to
> >> >> > > >> > >> handle geographic distribution not failover. If I
> understand
> >> >> > > correctly
> >> >> > > >> > what
> >> >> > > >> > >> Oliver is asking for is something to give fault tolerance
> >> not
> >> >> > > >> something
> >> >> > > >> > for
> >> >> > > >> > >> distribution. I don't think that is really what the
> >> mirroring
> >> >> > does
> >> >> > > out
> >> >> > > >> > of
> >> >> > > >> > >> the box, though technically i suppose you could just
> reset
> >> the
> >> >> > > offsets
> >> >> > > >> > and
> >> >> > > >> > >> point the consumer at the new cluster and have it start
> from
> >> >> > "now".
> >> >> > > >> > >>
> >> >> > > >> > >> I think it would be helpful to document our use case in
> the
> >> >> > > mirroring
> >> >> > > >> > docs
> >> >> > > >> > >> since this is not the first time someone has asked about
> >> this.
> >> >> > > >> > >>
> >> >> > > >> > >> -Jay
> >> >> > > >> > >>
> >> >> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <
> >> >> > jjkoshy.w@gmail.com>
> >> >> > > >> > wrote:
> >> >> > > >> > >>
> >> >> > > >> > >>> Hi Oliver,
> >> >> > > >> > >>>
> >> >> > > >> > >>> I was reading the mirroring guide and I wonder if it is
> >> >> required
> >> >> > > that
> >> >> > > >> > the
> >> >> > > >> > >>>> mirror runs it's own zookeeper?
> >> >> > > >> > >>>>
> >> >> > > >> > >>>> We have a zookeeper cluster running which is used by
> >> >> different
> >> >> > > >> > >>>> applications, so can we use that zookeeper cluster for
> the
> >> >> > kafka
> >> >> > > >> > source
> >> >> > > >> > >>> and
> >> >> > > >> > >>>> kafka mirror?
> >> >> > > >> > >>>>
> >> >> > > >> > >>>
> >> >> > > >> > >>> You could have a single zookeeper cluster and use
> different
> >> >> > > >> namespaces
> >> >> > > >> > for
> >> >> > > >> > >>> the source/target mirror. However, I don't think it is
> >> >> > > recommended to
> >> >> > > >> > use a
> >> >> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since
> that
> >> >> > would
> >> >> > > >> > >>> potentially mean very high ZK latencies on one of your
> >> >> clusters.
> >> >> > > >> > >>>
> >> >> > > >> > >>>
> >> >> > > >> > >>>> What is the procedure if the kafka source server fails
> to
> >> >> > switch
> >> >> > > the
> >> >> > > >> > >>>> applications to use the mirrored instance?
> >> >> > > >> > >>>>
> >> >> > > >> > >>>
> >> >> > > >> > >>> I don't quite follow this question - can you clarify?
> The
> >> >> mirror
> >> >> > > >> > cluster is
> >> >> > > >> > >>> pretty much a separate instance. There is no built-in
> >> >> automatic
> >> >> > > >> > fail-over
> >> >> > > >> > >>> if your source cluster goes down.
> >> >> > > >> > >>>
> >> >> > > >> > >>>
> >> >> > > >> > >>>> Are there any backup best practices if we would not use
> >> >> > > mirroring?
> >> >> > > >> > >>>>
> >> >> > > >> > >>>
> >> >> > > >> > >>> You can use RAID arrays for (local) data redundancy. You
> >> may
> >> >> > also
> >> >> > > be
> >> >> > > >> > >>> interested in the (intra-DC) replication feature
> (KAFKA-50)
> >> >> that
> >> >> > > is
> >> >> > > >> > >>> currently being developed. I believe some folks on this
> >> list
> >> >> > have
> >> >> > > >> also
> >> >> > > >> > used
> >> >> > > >> > >>> plain rsync's as an alternative to mirroring.
> >> >> > > >> > >>>
> >> >> > > >> > >>> Thanks,
> >> >> > > >> > >>>
> >> >> > > >> > >>> Joel
> >> >> > > >> > >>>
> >> >> > > >> > >
> >> >> > > >> >
> >> >> > > >>
> >> >> > >
> >> >> >
> >> >>
> >>
>

Re: Replication questions

Posted by Jay Kreps <ja...@gmail.com>.
Yes, that is correct. Technically we always immediately write to the
filesystem, it is just a question of when you fsync the file (that is
the slow thing). So though it is in memory it is not in application
memory, so it always survives kill -9 but not unplugging the machine.
Currently when a broker fails messages that are flushed to disk come
back if the broker comes back with an intact filesystem (if the broker
fs is destroyed then it is lost). With replication we retain this same
flexibility on the flush policy, so you can flush every message to
disk immediately if you like, however having the message on multiple
machines is in some ways better durability then the fsync gives, as
the message will survive destruction of the filesystem, so we think
you can legitimately allow consumers to consume messages independent
of the flush policy.

Also when a broker fails it will lose unflushed messages, however when
it comes back to life it will restore these messages from the other
replicas before it will serve data to consumers. So the log will be
byte-for-byte identical across all servers including both the contents
and the ordering of messages.

-Jay

On Tue, May 1, 2012 at 9:24 AM, Felix GV <fe...@mate1inc.com> wrote:
> Hmm... interesting!
>
> So, if I understanding correctly, what you're saying regarding point 2, is
> that the messages are going to be kept in memory on several nodes, and
> start being served to consumers as soon as this is completed, rather than
> after the data is flushed to disk? This way, we still benefit from the
> throughput gain of flushing data to disk in batches, but we consider that
> the added durability of having in-memory replication is good enough to
> start serving that data to consumers sooner.
>
> Furthermore, this means that in the unlikely event that several nodes would
> fail simultaneously (a correlated failure), the data that is replicated to
> the failed nodes but not yet flushed on any of them would be lost. However,
> when a single node crashes and is then restarted, only the failed node will
> have lost its unflushed data, while the other nodes that had replicated
> that data will have had the opportunity to flush it to disk later on.
>
> Sorry if I'm repeating like a parrot. I just want to make sure I understand
> correctly :)
>
> Please correct me if I'm not interpreting this correctly!
>
> --
> Felix
>
>
>
> On Mon, Apr 30, 2012 at 5:59 PM, Jay Kreps <ja...@gmail.com> wrote:
>
>> Yes, it is also worth noting that there are couple of different ways
>> to think about latency:
>> 1. latency of the request from the producer's point-of-view
>> 2. end-to-end latency to the consumer
>>
>> As Jun mentions (1) may go up a little because the producer was
>> sending data without checking for any answer from the server. Although
>> this gives a nice buffering effect it leads to a number of corner
>> cases that are hard to deal with correctly. It should be the case that
>> setting the consumer to async has the same effect from the producer
>> point of view without the corner cases of having no RPC response to
>> convey errors and other broker misbehavior.
>>
>> (2) May actually get significantly better, especially for lower volume
>> topics. The reason for this is because currently we wait until data is
>> flushed to disk before giving it to the consumer, this flush policy is
>> controlled by setting a number of messages or timeout at which the
>> flush is forced. The reason to configure this is because on
>> traditional disks each disk is likely to incur at least one seek. In
>> the new model replication can take the place of waiting on a disk
>> flush to provide durability (even if the log of the local server loses
>> unflushed data as long as all servers don't crash at the same time no
>> messages will be lost). Doing 2 parallel replication round-trips
>> (perhaps surprisingly) looks like it may be a lot lower-latency than
>> doing a local disk flush (< 1ms versus >= 10ms). In our own usage
>> desire for this kind of low-latency consumption is not common, but I
>> understand that this is a common need for messaging.
>>
>> -Jay
>>
>> On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <fe...@mate1inc.com> wrote:
>> > Thanks Jun :)
>> >
>> > --
>> > Felix
>> >
>> >
>> >
>> > On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <ju...@gmail.com> wrote:
>> >
>> >> Some comments inlined below.
>> >>
>> >> Thanks,
>> >>
>> >> Jun
>> >>
>> >> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com> wrote:
>> >>
>> >> > Cool :) Thanks for those insights :) !
>> >> >
>> >> > I changed the subject of the thread, in order not to derail the
>> original
>> >> > thread's subject...! I just want to recap to make sure I (and others)
>> >> > understand all of this correctly :)
>> >> >
>> >> > So, if I understand correctly, with acks == [0,1] Kafka should
>> provide a
>> >> > latency that is similar to what we have now, but with the possibility
>> of
>> >> > losing a small window of unreplicated events in the case of an
>> >> > unrecoverable hardware failure, and with acks > 1 (or acks == -1)
>> there
>> >> > will probably be a latency penalty but we will be completely protected
>> >> from
>> >> > (non-correlated) hardware failures, right?
>> >> >
>> >> > This is mostly true. The difference is that in 0.7, producer doesn't
>> wait
>> >> for a TCP response from broker. In 0.8, the producer always waits for a
>> >> response from broker. How quickly the broker sends the response depends
>> on
>> >> acks. If acks is less than ideal, you may get the response faster, but
>> have
>> >> some risk of losing the data if there is broker failure.
>> >>
>> >>
>> >> > Also, I guess the above assumptions are correct for a batch size of 1,
>> >> and
>> >> > that bigger batch sizes could also lead to small windows of unwritten
>> >> data
>> >> > in cases of failures, just like now...? Although, now that I think of
>> >> it, I
>> >> > guess the vulnerability of bigger batch sizes would, again, only come
>> >> into
>> >> > play in scenarios of unrecoverable correlated failures, since even if
>> a
>> >> > machine fails with some partially committed batch, there would be
>> other
>> >> > machines who received the same data (through replication) and would
>> have
>> >> > enough time to commit those batches...
>> >> >
>> >> > I want to add that if the producer itself dies, it could lose a batch
>> of
>> >> events.
>> >>
>> >>
>> >> > Finally, I guess that replication (whatever the ack parameter is) will
>> >> > affect the overall throughput capacity of the Kafka cluster, since
>> every
>> >> > node will now be writing its own data as well as the replicated data
>> from
>> >> > +/- 2 other nodes, right?
>> >> >
>> >> > --
>> >> > Felix
>> >> >
>> >> >
>> >> >
>> >> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <ja...@gmail.com>
>> wrote:
>> >> >
>> >> > > Short answer is yes, both async (acks=0 or 1) and sync replication
>> >> > > (acks > 1) will be both be supported.
>> >> > >
>> >> > > -Jay
>> >> > >
>> >> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <ju...@gmail.com> wrote:
>> >> > > > Felix,
>> >> > > >
>> >> > > > Initially, we thought we could keep the option of not sending acks
>> >> from
>> >> > > the
>> >> > > > broker to the producer. However, this seems hard since in the new
>> >> wire
>> >> > > > protocol, we need to send at least the error code to the producer
>> >> > (e.g.,
>> >> > > a
>> >> > > > request is sent to the wrong broker or wrong partition).
>> >> > > >
>> >> > > > So, what we allow in the current design is the following. The
>> >> producer
>> >> > > can
>> >> > > > specify the # of acks in the request. By default (acks = -1), the
>> >> > broker
>> >> > > > will wait for the message to be written to all replicas that are
>> >> still
>> >> > > > synced up with the leader before acking the producer. Otherwise
>> (acks
>> >> > > >=0),
>> >> > > > the broker will ack the producer after the message is written to
>> acks
>> >> > > > replicas. Currently, acks=0 is treated the same as acks=1.
>> >> > > >
>> >> > > > Thanks,
>> >> > > >
>> >> > > > Jun
>> >> > > >
>> >> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <fe...@mate1inc.com>
>> >> wrote:
>> >> > > >
>> >> > > >> Just curious, but if I remember correctly from the time I read
>> >> > KAFKA-50
>> >> > > and
>> >> > > >> the related JIRA issues, you guys plan to implement sync AND
>> async
>> >> > > >> replication, right?
>> >> > > >>
>> >> > > >> --
>> >> > > >> Felix
>> >> > > >>
>> >> > > >>
>> >> > > >>
>> >> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <ja...@gmail.com>
>> >> > wrote:
>> >> > > >>
>> >> > > >> > Right now we do sloppy failover. That is when a broker goes
>> down
>> >> > > >> > traffic is redirected to the remaining machines, but any
>> >> unconsumed
>> >> > > >> > messages are stuck on that server until it comes back, if it is
>> >> > > >> > permanently gone the messages are lost. This is acceptable for
>> us
>> >> in
>> >> > > >> > the near-term since our pipeline is pretty real-time so this
>> >> window
>> >> > > >> > between production and consumption is pretty small. The
>> complete
>> >> > > >> > solution is the intra-cluster replication in KAFA-50 which is
>> >> coming
>> >> > > >> > along fairly nicely now that we are working on it.
>> >> > > >> >
>> >> > > >> > -Jay
>> >> > > >> >
>> >> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
>> >> > > >> > <ol...@googlemail.com> wrote:
>> >> > > >> > > Hi,
>> >> > > >> > >
>> >> > > >> > > indeed I thought could be used as failover approach.
>> >> > > >> > >
>> >> > > >> > > We use raid for local redundancy but it does not protect us
>> in
>> >> > case
>> >> > > of
>> >> > > >> a
>> >> > > >> > machine failure, so I am looking for a way to achieve a
>> >> master/slave
>> >> > > >> setup
>> >> > > >> > until KAFKA-50 has been implemented.
>> >> > > >> > >
>> >> > > >> > > I think we can solve it for now by having multiple broker so
>> >> that
>> >> > > the
>> >> > > >> > application can continue sending messages if one broker goes
>> down.
>> >> > My
>> >> > > >> main
>> >> > > >> > concern is to not introduce a new single point of failure which
>> >> can
>> >> > > stop
>> >> > > >> > the application. However as some consumer are not developed by
>> us
>> >> > and
>> >> > > it
>> >> > > >> is
>> >> > > >> > not clear how they store the offset in zookeeper we need to
>> find
>> >> out
>> >> > > how
>> >> > > >> we
>> >> > > >> > can manage the consumer in case a broker will never return
>> after a
>> >> > > >> failure.
>> >> > > >> > It will be acceptable to lose a couple of messages if a broker
>> >> dies
>> >> > > and
>> >> > > >> the
>> >> > > >> > consumers have not consumed all messages at the point of
>> failure.
>> >> > > >> > >
>> >> > > >> > > Thanks,
>> >> > > >> > > Oliver
>> >> > > >> > >
>> >> > > >> > >
>> >> > > >> > >
>> >> > > >> > >
>> >> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
>> >> > > >> > >
>> >> > > >> > >> I think the confusion comes from the fact that we are using
>> >> > > mirroring
>> >> > > >> to
>> >> > > >> > >> handle geographic distribution not failover. If I understand
>> >> > > correctly
>> >> > > >> > what
>> >> > > >> > >> Oliver is asking for is something to give fault tolerance
>> not
>> >> > > >> something
>> >> > > >> > for
>> >> > > >> > >> distribution. I don't think that is really what the
>> mirroring
>> >> > does
>> >> > > out
>> >> > > >> > of
>> >> > > >> > >> the box, though technically i suppose you could just reset
>> the
>> >> > > offsets
>> >> > > >> > and
>> >> > > >> > >> point the consumer at the new cluster and have it start from
>> >> > "now".
>> >> > > >> > >>
>> >> > > >> > >> I think it would be helpful to document our use case in the
>> >> > > mirroring
>> >> > > >> > docs
>> >> > > >> > >> since this is not the first time someone has asked about
>> this.
>> >> > > >> > >>
>> >> > > >> > >> -Jay
>> >> > > >> > >>
>> >> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <
>> >> > jjkoshy.w@gmail.com>
>> >> > > >> > wrote:
>> >> > > >> > >>
>> >> > > >> > >>> Hi Oliver,
>> >> > > >> > >>>
>> >> > > >> > >>> I was reading the mirroring guide and I wonder if it is
>> >> required
>> >> > > that
>> >> > > >> > the
>> >> > > >> > >>>> mirror runs it's own zookeeper?
>> >> > > >> > >>>>
>> >> > > >> > >>>> We have a zookeeper cluster running which is used by
>> >> different
>> >> > > >> > >>>> applications, so can we use that zookeeper cluster for the
>> >> > kafka
>> >> > > >> > source
>> >> > > >> > >>> and
>> >> > > >> > >>>> kafka mirror?
>> >> > > >> > >>>>
>> >> > > >> > >>>
>> >> > > >> > >>> You could have a single zookeeper cluster and use different
>> >> > > >> namespaces
>> >> > > >> > for
>> >> > > >> > >>> the source/target mirror. However, I don't think it is
>> >> > > recommended to
>> >> > > >> > use a
>> >> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since that
>> >> > would
>> >> > > >> > >>> potentially mean very high ZK latencies on one of your
>> >> clusters.
>> >> > > >> > >>>
>> >> > > >> > >>>
>> >> > > >> > >>>> What is the procedure if the kafka source server fails to
>> >> > switch
>> >> > > the
>> >> > > >> > >>>> applications to use the mirrored instance?
>> >> > > >> > >>>>
>> >> > > >> > >>>
>> >> > > >> > >>> I don't quite follow this question - can you clarify? The
>> >> mirror
>> >> > > >> > cluster is
>> >> > > >> > >>> pretty much a separate instance. There is no built-in
>> >> automatic
>> >> > > >> > fail-over
>> >> > > >> > >>> if your source cluster goes down.
>> >> > > >> > >>>
>> >> > > >> > >>>
>> >> > > >> > >>>> Are there any backup best practices if we would not use
>> >> > > mirroring?
>> >> > > >> > >>>>
>> >> > > >> > >>>
>> >> > > >> > >>> You can use RAID arrays for (local) data redundancy. You
>> may
>> >> > also
>> >> > > be
>> >> > > >> > >>> interested in the (intra-DC) replication feature (KAFKA-50)
>> >> that
>> >> > > is
>> >> > > >> > >>> currently being developed. I believe some folks on this
>> list
>> >> > have
>> >> > > >> also
>> >> > > >> > used
>> >> > > >> > >>> plain rsync's as an alternative to mirroring.
>> >> > > >> > >>>
>> >> > > >> > >>> Thanks,
>> >> > > >> > >>>
>> >> > > >> > >>> Joel
>> >> > > >> > >>>
>> >> > > >> > >
>> >> > > >> >
>> >> > > >>
>> >> > >
>> >> >
>> >>
>>

Re: Replication questions

Posted by Felix GV <fe...@mate1inc.com>.
Hmm... interesting!

So, if I understanding correctly, what you're saying regarding point 2, is
that the messages are going to be kept in memory on several nodes, and
start being served to consumers as soon as this is completed, rather than
after the data is flushed to disk? This way, we still benefit from the
throughput gain of flushing data to disk in batches, but we consider that
the added durability of having in-memory replication is good enough to
start serving that data to consumers sooner.

Furthermore, this means that in the unlikely event that several nodes would
fail simultaneously (a correlated failure), the data that is replicated to
the failed nodes but not yet flushed on any of them would be lost. However,
when a single node crashes and is then restarted, only the failed node will
have lost its unflushed data, while the other nodes that had replicated
that data will have had the opportunity to flush it to disk later on.

Sorry if I'm repeating like a parrot. I just want to make sure I understand
correctly :)

Please correct me if I'm not interpreting this correctly!

--
Felix



On Mon, Apr 30, 2012 at 5:59 PM, Jay Kreps <ja...@gmail.com> wrote:

> Yes, it is also worth noting that there are couple of different ways
> to think about latency:
> 1. latency of the request from the producer's point-of-view
> 2. end-to-end latency to the consumer
>
> As Jun mentions (1) may go up a little because the producer was
> sending data without checking for any answer from the server. Although
> this gives a nice buffering effect it leads to a number of corner
> cases that are hard to deal with correctly. It should be the case that
> setting the consumer to async has the same effect from the producer
> point of view without the corner cases of having no RPC response to
> convey errors and other broker misbehavior.
>
> (2) May actually get significantly better, especially for lower volume
> topics. The reason for this is because currently we wait until data is
> flushed to disk before giving it to the consumer, this flush policy is
> controlled by setting a number of messages or timeout at which the
> flush is forced. The reason to configure this is because on
> traditional disks each disk is likely to incur at least one seek. In
> the new model replication can take the place of waiting on a disk
> flush to provide durability (even if the log of the local server loses
> unflushed data as long as all servers don't crash at the same time no
> messages will be lost). Doing 2 parallel replication round-trips
> (perhaps surprisingly) looks like it may be a lot lower-latency than
> doing a local disk flush (< 1ms versus >= 10ms). In our own usage
> desire for this kind of low-latency consumption is not common, but I
> understand that this is a common need for messaging.
>
> -Jay
>
> On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <fe...@mate1inc.com> wrote:
> > Thanks Jun :)
> >
> > --
> > Felix
> >
> >
> >
> > On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> Some comments inlined below.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com> wrote:
> >>
> >> > Cool :) Thanks for those insights :) !
> >> >
> >> > I changed the subject of the thread, in order not to derail the
> original
> >> > thread's subject...! I just want to recap to make sure I (and others)
> >> > understand all of this correctly :)
> >> >
> >> > So, if I understand correctly, with acks == [0,1] Kafka should
> provide a
> >> > latency that is similar to what we have now, but with the possibility
> of
> >> > losing a small window of unreplicated events in the case of an
> >> > unrecoverable hardware failure, and with acks > 1 (or acks == -1)
> there
> >> > will probably be a latency penalty but we will be completely protected
> >> from
> >> > (non-correlated) hardware failures, right?
> >> >
> >> > This is mostly true. The difference is that in 0.7, producer doesn't
> wait
> >> for a TCP response from broker. In 0.8, the producer always waits for a
> >> response from broker. How quickly the broker sends the response depends
> on
> >> acks. If acks is less than ideal, you may get the response faster, but
> have
> >> some risk of losing the data if there is broker failure.
> >>
> >>
> >> > Also, I guess the above assumptions are correct for a batch size of 1,
> >> and
> >> > that bigger batch sizes could also lead to small windows of unwritten
> >> data
> >> > in cases of failures, just like now...? Although, now that I think of
> >> it, I
> >> > guess the vulnerability of bigger batch sizes would, again, only come
> >> into
> >> > play in scenarios of unrecoverable correlated failures, since even if
> a
> >> > machine fails with some partially committed batch, there would be
> other
> >> > machines who received the same data (through replication) and would
> have
> >> > enough time to commit those batches...
> >> >
> >> > I want to add that if the producer itself dies, it could lose a batch
> of
> >> events.
> >>
> >>
> >> > Finally, I guess that replication (whatever the ack parameter is) will
> >> > affect the overall throughput capacity of the Kafka cluster, since
> every
> >> > node will now be writing its own data as well as the replicated data
> from
> >> > +/- 2 other nodes, right?
> >> >
> >> > --
> >> > Felix
> >> >
> >> >
> >> >
> >> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> >> >
> >> > > Short answer is yes, both async (acks=0 or 1) and sync replication
> >> > > (acks > 1) will be both be supported.
> >> > >
> >> > > -Jay
> >> > >
> >> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <ju...@gmail.com> wrote:
> >> > > > Felix,
> >> > > >
> >> > > > Initially, we thought we could keep the option of not sending acks
> >> from
> >> > > the
> >> > > > broker to the producer. However, this seems hard since in the new
> >> wire
> >> > > > protocol, we need to send at least the error code to the producer
> >> > (e.g.,
> >> > > a
> >> > > > request is sent to the wrong broker or wrong partition).
> >> > > >
> >> > > > So, what we allow in the current design is the following. The
> >> producer
> >> > > can
> >> > > > specify the # of acks in the request. By default (acks = -1), the
> >> > broker
> >> > > > will wait for the message to be written to all replicas that are
> >> still
> >> > > > synced up with the leader before acking the producer. Otherwise
> (acks
> >> > > >=0),
> >> > > > the broker will ack the producer after the message is written to
> acks
> >> > > > replicas. Currently, acks=0 is treated the same as acks=1.
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Jun
> >> > > >
> >> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <fe...@mate1inc.com>
> >> wrote:
> >> > > >
> >> > > >> Just curious, but if I remember correctly from the time I read
> >> > KAFKA-50
> >> > > and
> >> > > >> the related JIRA issues, you guys plan to implement sync AND
> async
> >> > > >> replication, right?
> >> > > >>
> >> > > >> --
> >> > > >> Felix
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <ja...@gmail.com>
> >> > wrote:
> >> > > >>
> >> > > >> > Right now we do sloppy failover. That is when a broker goes
> down
> >> > > >> > traffic is redirected to the remaining machines, but any
> >> unconsumed
> >> > > >> > messages are stuck on that server until it comes back, if it is
> >> > > >> > permanently gone the messages are lost. This is acceptable for
> us
> >> in
> >> > > >> > the near-term since our pipeline is pretty real-time so this
> >> window
> >> > > >> > between production and consumption is pretty small. The
> complete
> >> > > >> > solution is the intra-cluster replication in KAFA-50 which is
> >> coming
> >> > > >> > along fairly nicely now that we are working on it.
> >> > > >> >
> >> > > >> > -Jay
> >> > > >> >
> >> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
> >> > > >> > <ol...@googlemail.com> wrote:
> >> > > >> > > Hi,
> >> > > >> > >
> >> > > >> > > indeed I thought could be used as failover approach.
> >> > > >> > >
> >> > > >> > > We use raid for local redundancy but it does not protect us
> in
> >> > case
> >> > > of
> >> > > >> a
> >> > > >> > machine failure, so I am looking for a way to achieve a
> >> master/slave
> >> > > >> setup
> >> > > >> > until KAFKA-50 has been implemented.
> >> > > >> > >
> >> > > >> > > I think we can solve it for now by having multiple broker so
> >> that
> >> > > the
> >> > > >> > application can continue sending messages if one broker goes
> down.
> >> > My
> >> > > >> main
> >> > > >> > concern is to not introduce a new single point of failure which
> >> can
> >> > > stop
> >> > > >> > the application. However as some consumer are not developed by
> us
> >> > and
> >> > > it
> >> > > >> is
> >> > > >> > not clear how they store the offset in zookeeper we need to
> find
> >> out
> >> > > how
> >> > > >> we
> >> > > >> > can manage the consumer in case a broker will never return
> after a
> >> > > >> failure.
> >> > > >> > It will be acceptable to lose a couple of messages if a broker
> >> dies
> >> > > and
> >> > > >> the
> >> > > >> > consumers have not consumed all messages at the point of
> failure.
> >> > > >> > >
> >> > > >> > > Thanks,
> >> > > >> > > Oliver
> >> > > >> > >
> >> > > >> > >
> >> > > >> > >
> >> > > >> > >
> >> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
> >> > > >> > >
> >> > > >> > >> I think the confusion comes from the fact that we are using
> >> > > mirroring
> >> > > >> to
> >> > > >> > >> handle geographic distribution not failover. If I understand
> >> > > correctly
> >> > > >> > what
> >> > > >> > >> Oliver is asking for is something to give fault tolerance
> not
> >> > > >> something
> >> > > >> > for
> >> > > >> > >> distribution. I don't think that is really what the
> mirroring
> >> > does
> >> > > out
> >> > > >> > of
> >> > > >> > >> the box, though technically i suppose you could just reset
> the
> >> > > offsets
> >> > > >> > and
> >> > > >> > >> point the consumer at the new cluster and have it start from
> >> > "now".
> >> > > >> > >>
> >> > > >> > >> I think it would be helpful to document our use case in the
> >> > > mirroring
> >> > > >> > docs
> >> > > >> > >> since this is not the first time someone has asked about
> this.
> >> > > >> > >>
> >> > > >> > >> -Jay
> >> > > >> > >>
> >> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <
> >> > jjkoshy.w@gmail.com>
> >> > > >> > wrote:
> >> > > >> > >>
> >> > > >> > >>> Hi Oliver,
> >> > > >> > >>>
> >> > > >> > >>> I was reading the mirroring guide and I wonder if it is
> >> required
> >> > > that
> >> > > >> > the
> >> > > >> > >>>> mirror runs it's own zookeeper?
> >> > > >> > >>>>
> >> > > >> > >>>> We have a zookeeper cluster running which is used by
> >> different
> >> > > >> > >>>> applications, so can we use that zookeeper cluster for the
> >> > kafka
> >> > > >> > source
> >> > > >> > >>> and
> >> > > >> > >>>> kafka mirror?
> >> > > >> > >>>>
> >> > > >> > >>>
> >> > > >> > >>> You could have a single zookeeper cluster and use different
> >> > > >> namespaces
> >> > > >> > for
> >> > > >> > >>> the source/target mirror. However, I don't think it is
> >> > > recommended to
> >> > > >> > use a
> >> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since that
> >> > would
> >> > > >> > >>> potentially mean very high ZK latencies on one of your
> >> clusters.
> >> > > >> > >>>
> >> > > >> > >>>
> >> > > >> > >>>> What is the procedure if the kafka source server fails to
> >> > switch
> >> > > the
> >> > > >> > >>>> applications to use the mirrored instance?
> >> > > >> > >>>>
> >> > > >> > >>>
> >> > > >> > >>> I don't quite follow this question - can you clarify? The
> >> mirror
> >> > > >> > cluster is
> >> > > >> > >>> pretty much a separate instance. There is no built-in
> >> automatic
> >> > > >> > fail-over
> >> > > >> > >>> if your source cluster goes down.
> >> > > >> > >>>
> >> > > >> > >>>
> >> > > >> > >>>> Are there any backup best practices if we would not use
> >> > > mirroring?
> >> > > >> > >>>>
> >> > > >> > >>>
> >> > > >> > >>> You can use RAID arrays for (local) data redundancy. You
> may
> >> > also
> >> > > be
> >> > > >> > >>> interested in the (intra-DC) replication feature (KAFKA-50)
> >> that
> >> > > is
> >> > > >> > >>> currently being developed. I believe some folks on this
> list
> >> > have
> >> > > >> also
> >> > > >> > used
> >> > > >> > >>> plain rsync's as an alternative to mirroring.
> >> > > >> > >>>
> >> > > >> > >>> Thanks,
> >> > > >> > >>>
> >> > > >> > >>> Joel
> >> > > >> > >>>
> >> > > >> > >
> >> > > >> >
> >> > > >>
> >> > >
> >> >
> >>
>

Re: Replication questions

Posted by Jay Kreps <ja...@gmail.com>.
Yes, it is also worth noting that there are couple of different ways
to think about latency:
1. latency of the request from the producer's point-of-view
2. end-to-end latency to the consumer

As Jun mentions (1) may go up a little because the producer was
sending data without checking for any answer from the server. Although
this gives a nice buffering effect it leads to a number of corner
cases that are hard to deal with correctly. It should be the case that
setting the consumer to async has the same effect from the producer
point of view without the corner cases of having no RPC response to
convey errors and other broker misbehavior.

(2) May actually get significantly better, especially for lower volume
topics. The reason for this is because currently we wait until data is
flushed to disk before giving it to the consumer, this flush policy is
controlled by setting a number of messages or timeout at which the
flush is forced. The reason to configure this is because on
traditional disks each disk is likely to incur at least one seek. In
the new model replication can take the place of waiting on a disk
flush to provide durability (even if the log of the local server loses
unflushed data as long as all servers don't crash at the same time no
messages will be lost). Doing 2 parallel replication round-trips
(perhaps surprisingly) looks like it may be a lot lower-latency than
doing a local disk flush (< 1ms versus >= 10ms). In our own usage
desire for this kind of low-latency consumption is not common, but I
understand that this is a common need for messaging.

-Jay

On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <fe...@mate1inc.com> wrote:
> Thanks Jun :)
>
> --
> Felix
>
>
>
> On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> Some comments inlined below.
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com> wrote:
>>
>> > Cool :) Thanks for those insights :) !
>> >
>> > I changed the subject of the thread, in order not to derail the original
>> > thread's subject...! I just want to recap to make sure I (and others)
>> > understand all of this correctly :)
>> >
>> > So, if I understand correctly, with acks == [0,1] Kafka should provide a
>> > latency that is similar to what we have now, but with the possibility of
>> > losing a small window of unreplicated events in the case of an
>> > unrecoverable hardware failure, and with acks > 1 (or acks == -1) there
>> > will probably be a latency penalty but we will be completely protected
>> from
>> > (non-correlated) hardware failures, right?
>> >
>> > This is mostly true. The difference is that in 0.7, producer doesn't wait
>> for a TCP response from broker. In 0.8, the producer always waits for a
>> response from broker. How quickly the broker sends the response depends on
>> acks. If acks is less than ideal, you may get the response faster, but have
>> some risk of losing the data if there is broker failure.
>>
>>
>> > Also, I guess the above assumptions are correct for a batch size of 1,
>> and
>> > that bigger batch sizes could also lead to small windows of unwritten
>> data
>> > in cases of failures, just like now...? Although, now that I think of
>> it, I
>> > guess the vulnerability of bigger batch sizes would, again, only come
>> into
>> > play in scenarios of unrecoverable correlated failures, since even if a
>> > machine fails with some partially committed batch, there would be other
>> > machines who received the same data (through replication) and would have
>> > enough time to commit those batches...
>> >
>> > I want to add that if the producer itself dies, it could lose a batch of
>> events.
>>
>>
>> > Finally, I guess that replication (whatever the ack parameter is) will
>> > affect the overall throughput capacity of the Kafka cluster, since every
>> > node will now be writing its own data as well as the replicated data from
>> > +/- 2 other nodes, right?
>> >
>> > --
>> > Felix
>> >
>> >
>> >
>> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <ja...@gmail.com> wrote:
>> >
>> > > Short answer is yes, both async (acks=0 or 1) and sync replication
>> > > (acks > 1) will be both be supported.
>> > >
>> > > -Jay
>> > >
>> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <ju...@gmail.com> wrote:
>> > > > Felix,
>> > > >
>> > > > Initially, we thought we could keep the option of not sending acks
>> from
>> > > the
>> > > > broker to the producer. However, this seems hard since in the new
>> wire
>> > > > protocol, we need to send at least the error code to the producer
>> > (e.g.,
>> > > a
>> > > > request is sent to the wrong broker or wrong partition).
>> > > >
>> > > > So, what we allow in the current design is the following. The
>> producer
>> > > can
>> > > > specify the # of acks in the request. By default (acks = -1), the
>> > broker
>> > > > will wait for the message to be written to all replicas that are
>> still
>> > > > synced up with the leader before acking the producer. Otherwise (acks
>> > > >=0),
>> > > > the broker will ack the producer after the message is written to acks
>> > > > replicas. Currently, acks=0 is treated the same as acks=1.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jun
>> > > >
>> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <fe...@mate1inc.com>
>> wrote:
>> > > >
>> > > >> Just curious, but if I remember correctly from the time I read
>> > KAFKA-50
>> > > and
>> > > >> the related JIRA issues, you guys plan to implement sync AND async
>> > > >> replication, right?
>> > > >>
>> > > >> --
>> > > >> Felix
>> > > >>
>> > > >>
>> > > >>
>> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <ja...@gmail.com>
>> > wrote:
>> > > >>
>> > > >> > Right now we do sloppy failover. That is when a broker goes down
>> > > >> > traffic is redirected to the remaining machines, but any
>> unconsumed
>> > > >> > messages are stuck on that server until it comes back, if it is
>> > > >> > permanently gone the messages are lost. This is acceptable for us
>> in
>> > > >> > the near-term since our pipeline is pretty real-time so this
>> window
>> > > >> > between production and consumption is pretty small. The complete
>> > > >> > solution is the intra-cluster replication in KAFA-50 which is
>> coming
>> > > >> > along fairly nicely now that we are working on it.
>> > > >> >
>> > > >> > -Jay
>> > > >> >
>> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
>> > > >> > <ol...@googlemail.com> wrote:
>> > > >> > > Hi,
>> > > >> > >
>> > > >> > > indeed I thought could be used as failover approach.
>> > > >> > >
>> > > >> > > We use raid for local redundancy but it does not protect us in
>> > case
>> > > of
>> > > >> a
>> > > >> > machine failure, so I am looking for a way to achieve a
>> master/slave
>> > > >> setup
>> > > >> > until KAFKA-50 has been implemented.
>> > > >> > >
>> > > >> > > I think we can solve it for now by having multiple broker so
>> that
>> > > the
>> > > >> > application can continue sending messages if one broker goes down.
>> > My
>> > > >> main
>> > > >> > concern is to not introduce a new single point of failure which
>> can
>> > > stop
>> > > >> > the application. However as some consumer are not developed by us
>> > and
>> > > it
>> > > >> is
>> > > >> > not clear how they store the offset in zookeeper we need to find
>> out
>> > > how
>> > > >> we
>> > > >> > can manage the consumer in case a broker will never return after a
>> > > >> failure.
>> > > >> > It will be acceptable to lose a couple of messages if a broker
>> dies
>> > > and
>> > > >> the
>> > > >> > consumers have not consumed all messages at the point of failure.
>> > > >> > >
>> > > >> > > Thanks,
>> > > >> > > Oliver
>> > > >> > >
>> > > >> > >
>> > > >> > >
>> > > >> > >
>> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
>> > > >> > >
>> > > >> > >> I think the confusion comes from the fact that we are using
>> > > mirroring
>> > > >> to
>> > > >> > >> handle geographic distribution not failover. If I understand
>> > > correctly
>> > > >> > what
>> > > >> > >> Oliver is asking for is something to give fault tolerance not
>> > > >> something
>> > > >> > for
>> > > >> > >> distribution. I don't think that is really what the mirroring
>> > does
>> > > out
>> > > >> > of
>> > > >> > >> the box, though technically i suppose you could just reset the
>> > > offsets
>> > > >> > and
>> > > >> > >> point the consumer at the new cluster and have it start from
>> > "now".
>> > > >> > >>
>> > > >> > >> I think it would be helpful to document our use case in the
>> > > mirroring
>> > > >> > docs
>> > > >> > >> since this is not the first time someone has asked about this.
>> > > >> > >>
>> > > >> > >> -Jay
>> > > >> > >>
>> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <
>> > jjkoshy.w@gmail.com>
>> > > >> > wrote:
>> > > >> > >>
>> > > >> > >>> Hi Oliver,
>> > > >> > >>>
>> > > >> > >>> I was reading the mirroring guide and I wonder if it is
>> required
>> > > that
>> > > >> > the
>> > > >> > >>>> mirror runs it's own zookeeper?
>> > > >> > >>>>
>> > > >> > >>>> We have a zookeeper cluster running which is used by
>> different
>> > > >> > >>>> applications, so can we use that zookeeper cluster for the
>> > kafka
>> > > >> > source
>> > > >> > >>> and
>> > > >> > >>>> kafka mirror?
>> > > >> > >>>>
>> > > >> > >>>
>> > > >> > >>> You could have a single zookeeper cluster and use different
>> > > >> namespaces
>> > > >> > for
>> > > >> > >>> the source/target mirror. However, I don't think it is
>> > > recommended to
>> > > >> > use a
>> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since that
>> > would
>> > > >> > >>> potentially mean very high ZK latencies on one of your
>> clusters.
>> > > >> > >>>
>> > > >> > >>>
>> > > >> > >>>> What is the procedure if the kafka source server fails to
>> > switch
>> > > the
>> > > >> > >>>> applications to use the mirrored instance?
>> > > >> > >>>>
>> > > >> > >>>
>> > > >> > >>> I don't quite follow this question - can you clarify? The
>> mirror
>> > > >> > cluster is
>> > > >> > >>> pretty much a separate instance. There is no built-in
>> automatic
>> > > >> > fail-over
>> > > >> > >>> if your source cluster goes down.
>> > > >> > >>>
>> > > >> > >>>
>> > > >> > >>>> Are there any backup best practices if we would not use
>> > > mirroring?
>> > > >> > >>>>
>> > > >> > >>>
>> > > >> > >>> You can use RAID arrays for (local) data redundancy. You may
>> > also
>> > > be
>> > > >> > >>> interested in the (intra-DC) replication feature (KAFKA-50)
>> that
>> > > is
>> > > >> > >>> currently being developed. I believe some folks on this list
>> > have
>> > > >> also
>> > > >> > used
>> > > >> > >>> plain rsync's as an alternative to mirroring.
>> > > >> > >>>
>> > > >> > >>> Thanks,
>> > > >> > >>>
>> > > >> > >>> Joel
>> > > >> > >>>
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>>

Re: Replication questions

Posted by Felix GV <fe...@mate1inc.com>.
Thanks Jun :)

--
Felix



On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <ju...@gmail.com> wrote:

> Some comments inlined below.
>
> Thanks,
>
> Jun
>
> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com> wrote:
>
> > Cool :) Thanks for those insights :) !
> >
> > I changed the subject of the thread, in order not to derail the original
> > thread's subject...! I just want to recap to make sure I (and others)
> > understand all of this correctly :)
> >
> > So, if I understand correctly, with acks == [0,1] Kafka should provide a
> > latency that is similar to what we have now, but with the possibility of
> > losing a small window of unreplicated events in the case of an
> > unrecoverable hardware failure, and with acks > 1 (or acks == -1) there
> > will probably be a latency penalty but we will be completely protected
> from
> > (non-correlated) hardware failures, right?
> >
> > This is mostly true. The difference is that in 0.7, producer doesn't wait
> for a TCP response from broker. In 0.8, the producer always waits for a
> response from broker. How quickly the broker sends the response depends on
> acks. If acks is less than ideal, you may get the response faster, but have
> some risk of losing the data if there is broker failure.
>
>
> > Also, I guess the above assumptions are correct for a batch size of 1,
> and
> > that bigger batch sizes could also lead to small windows of unwritten
> data
> > in cases of failures, just like now...? Although, now that I think of
> it, I
> > guess the vulnerability of bigger batch sizes would, again, only come
> into
> > play in scenarios of unrecoverable correlated failures, since even if a
> > machine fails with some partially committed batch, there would be other
> > machines who received the same data (through replication) and would have
> > enough time to commit those batches...
> >
> > I want to add that if the producer itself dies, it could lose a batch of
> events.
>
>
> > Finally, I guess that replication (whatever the ack parameter is) will
> > affect the overall throughput capacity of the Kafka cluster, since every
> > node will now be writing its own data as well as the replicated data from
> > +/- 2 other nodes, right?
> >
> > --
> > Felix
> >
> >
> >
> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Short answer is yes, both async (acks=0 or 1) and sync replication
> > > (acks > 1) will be both be supported.
> > >
> > > -Jay
> > >
> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <ju...@gmail.com> wrote:
> > > > Felix,
> > > >
> > > > Initially, we thought we could keep the option of not sending acks
> from
> > > the
> > > > broker to the producer. However, this seems hard since in the new
> wire
> > > > protocol, we need to send at least the error code to the producer
> > (e.g.,
> > > a
> > > > request is sent to the wrong broker or wrong partition).
> > > >
> > > > So, what we allow in the current design is the following. The
> producer
> > > can
> > > > specify the # of acks in the request. By default (acks = -1), the
> > broker
> > > > will wait for the message to be written to all replicas that are
> still
> > > > synced up with the leader before acking the producer. Otherwise (acks
> > > >=0),
> > > > the broker will ack the producer after the message is written to acks
> > > > replicas. Currently, acks=0 is treated the same as acks=1.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <fe...@mate1inc.com>
> wrote:
> > > >
> > > >> Just curious, but if I remember correctly from the time I read
> > KAFKA-50
> > > and
> > > >> the related JIRA issues, you guys plan to implement sync AND async
> > > >> replication, right?
> > > >>
> > > >> --
> > > >> Felix
> > > >>
> > > >>
> > > >>
> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >>
> > > >> > Right now we do sloppy failover. That is when a broker goes down
> > > >> > traffic is redirected to the remaining machines, but any
> unconsumed
> > > >> > messages are stuck on that server until it comes back, if it is
> > > >> > permanently gone the messages are lost. This is acceptable for us
> in
> > > >> > the near-term since our pipeline is pretty real-time so this
> window
> > > >> > between production and consumption is pretty small. The complete
> > > >> > solution is the intra-cluster replication in KAFA-50 which is
> coming
> > > >> > along fairly nicely now that we are working on it.
> > > >> >
> > > >> > -Jay
> > > >> >
> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
> > > >> > <ol...@googlemail.com> wrote:
> > > >> > > Hi,
> > > >> > >
> > > >> > > indeed I thought could be used as failover approach.
> > > >> > >
> > > >> > > We use raid for local redundancy but it does not protect us in
> > case
> > > of
> > > >> a
> > > >> > machine failure, so I am looking for a way to achieve a
> master/slave
> > > >> setup
> > > >> > until KAFKA-50 has been implemented.
> > > >> > >
> > > >> > > I think we can solve it for now by having multiple broker so
> that
> > > the
> > > >> > application can continue sending messages if one broker goes down.
> > My
> > > >> main
> > > >> > concern is to not introduce a new single point of failure which
> can
> > > stop
> > > >> > the application. However as some consumer are not developed by us
> > and
> > > it
> > > >> is
> > > >> > not clear how they store the offset in zookeeper we need to find
> out
> > > how
> > > >> we
> > > >> > can manage the consumer in case a broker will never return after a
> > > >> failure.
> > > >> > It will be acceptable to lose a couple of messages if a broker
> dies
> > > and
> > > >> the
> > > >> > consumers have not consumed all messages at the point of failure.
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Oliver
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
> > > >> > >
> > > >> > >> I think the confusion comes from the fact that we are using
> > > mirroring
> > > >> to
> > > >> > >> handle geographic distribution not failover. If I understand
> > > correctly
> > > >> > what
> > > >> > >> Oliver is asking for is something to give fault tolerance not
> > > >> something
> > > >> > for
> > > >> > >> distribution. I don't think that is really what the mirroring
> > does
> > > out
> > > >> > of
> > > >> > >> the box, though technically i suppose you could just reset the
> > > offsets
> > > >> > and
> > > >> > >> point the consumer at the new cluster and have it start from
> > "now".
> > > >> > >>
> > > >> > >> I think it would be helpful to document our use case in the
> > > mirroring
> > > >> > docs
> > > >> > >> since this is not the first time someone has asked about this.
> > > >> > >>
> > > >> > >> -Jay
> > > >> > >>
> > > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <
> > jjkoshy.w@gmail.com>
> > > >> > wrote:
> > > >> > >>
> > > >> > >>> Hi Oliver,
> > > >> > >>>
> > > >> > >>> I was reading the mirroring guide and I wonder if it is
> required
> > > that
> > > >> > the
> > > >> > >>>> mirror runs it's own zookeeper?
> > > >> > >>>>
> > > >> > >>>> We have a zookeeper cluster running which is used by
> different
> > > >> > >>>> applications, so can we use that zookeeper cluster for the
> > kafka
> > > >> > source
> > > >> > >>> and
> > > >> > >>>> kafka mirror?
> > > >> > >>>>
> > > >> > >>>
> > > >> > >>> You could have a single zookeeper cluster and use different
> > > >> namespaces
> > > >> > for
> > > >> > >>> the source/target mirror. However, I don't think it is
> > > recommended to
> > > >> > use a
> > > >> > >>> remote zookeeper (if you have a cross-DC set up) since that
> > would
> > > >> > >>> potentially mean very high ZK latencies on one of your
> clusters.
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>> What is the procedure if the kafka source server fails to
> > switch
> > > the
> > > >> > >>>> applications to use the mirrored instance?
> > > >> > >>>>
> > > >> > >>>
> > > >> > >>> I don't quite follow this question - can you clarify? The
> mirror
> > > >> > cluster is
> > > >> > >>> pretty much a separate instance. There is no built-in
> automatic
> > > >> > fail-over
> > > >> > >>> if your source cluster goes down.
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>> Are there any backup best practices if we would not use
> > > mirroring?
> > > >> > >>>>
> > > >> > >>>
> > > >> > >>> You can use RAID arrays for (local) data redundancy. You may
> > also
> > > be
> > > >> > >>> interested in the (intra-DC) replication feature (KAFKA-50)
> that
> > > is
> > > >> > >>> currently being developed. I believe some folks on this list
> > have
> > > >> also
> > > >> > used
> > > >> > >>> plain rsync's as an alternative to mirroring.
> > > >> > >>>
> > > >> > >>> Thanks,
> > > >> > >>>
> > > >> > >>> Joel
> > > >> > >>>
> > > >> > >
> > > >> >
> > > >>
> > >
> >
>

Re: Replication questions

Posted by Jun Rao <ju...@gmail.com>.
Some comments inlined below.

Thanks,

Jun

On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <fe...@mate1inc.com> wrote:

> Cool :) Thanks for those insights :) !
>
> I changed the subject of the thread, in order not to derail the original
> thread's subject...! I just want to recap to make sure I (and others)
> understand all of this correctly :)
>
> So, if I understand correctly, with acks == [0,1] Kafka should provide a
> latency that is similar to what we have now, but with the possibility of
> losing a small window of unreplicated events in the case of an
> unrecoverable hardware failure, and with acks > 1 (or acks == -1) there
> will probably be a latency penalty but we will be completely protected from
> (non-correlated) hardware failures, right?
>
> This is mostly true. The difference is that in 0.7, producer doesn't wait
for a TCP response from broker. In 0.8, the producer always waits for a
response from broker. How quickly the broker sends the response depends on
acks. If acks is less than ideal, you may get the response faster, but have
some risk of losing the data if there is broker failure.


> Also, I guess the above assumptions are correct for a batch size of 1, and
> that bigger batch sizes could also lead to small windows of unwritten data
> in cases of failures, just like now...? Although, now that I think of it, I
> guess the vulnerability of bigger batch sizes would, again, only come into
> play in scenarios of unrecoverable correlated failures, since even if a
> machine fails with some partially committed batch, there would be other
> machines who received the same data (through replication) and would have
> enough time to commit those batches...
>
> I want to add that if the producer itself dies, it could lose a batch of
events.


> Finally, I guess that replication (whatever the ack parameter is) will
> affect the overall throughput capacity of the Kafka cluster, since every
> node will now be writing its own data as well as the replicated data from
> +/- 2 other nodes, right?
>
> --
> Felix
>
>
>
> On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Short answer is yes, both async (acks=0 or 1) and sync replication
> > (acks > 1) will be both be supported.
> >
> > -Jay
> >
> > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <ju...@gmail.com> wrote:
> > > Felix,
> > >
> > > Initially, we thought we could keep the option of not sending acks from
> > the
> > > broker to the producer. However, this seems hard since in the new wire
> > > protocol, we need to send at least the error code to the producer
> (e.g.,
> > a
> > > request is sent to the wrong broker or wrong partition).
> > >
> > > So, what we allow in the current design is the following. The producer
> > can
> > > specify the # of acks in the request. By default (acks = -1), the
> broker
> > > will wait for the message to be written to all replicas that are still
> > > synced up with the leader before acking the producer. Otherwise (acks
> > >=0),
> > > the broker will ack the producer after the message is written to acks
> > > replicas. Currently, acks=0 is treated the same as acks=1.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <fe...@mate1inc.com> wrote:
> > >
> > >> Just curious, but if I remember correctly from the time I read
> KAFKA-50
> > and
> > >> the related JIRA issues, you guys plan to implement sync AND async
> > >> replication, right?
> > >>
> > >> --
> > >> Felix
> > >>
> > >>
> > >>
> > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >>
> > >> > Right now we do sloppy failover. That is when a broker goes down
> > >> > traffic is redirected to the remaining machines, but any unconsumed
> > >> > messages are stuck on that server until it comes back, if it is
> > >> > permanently gone the messages are lost. This is acceptable for us in
> > >> > the near-term since our pipeline is pretty real-time so this window
> > >> > between production and consumption is pretty small. The complete
> > >> > solution is the intra-cluster replication in KAFA-50 which is coming
> > >> > along fairly nicely now that we are working on it.
> > >> >
> > >> > -Jay
> > >> >
> > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver Krohne
> > >> > <ol...@googlemail.com> wrote:
> > >> > > Hi,
> > >> > >
> > >> > > indeed I thought could be used as failover approach.
> > >> > >
> > >> > > We use raid for local redundancy but it does not protect us in
> case
> > of
> > >> a
> > >> > machine failure, so I am looking for a way to achieve a master/slave
> > >> setup
> > >> > until KAFKA-50 has been implemented.
> > >> > >
> > >> > > I think we can solve it for now by having multiple broker so that
> > the
> > >> > application can continue sending messages if one broker goes down.
> My
> > >> main
> > >> > concern is to not introduce a new single point of failure which can
> > stop
> > >> > the application. However as some consumer are not developed by us
> and
> > it
> > >> is
> > >> > not clear how they store the offset in zookeeper we need to find out
> > how
> > >> we
> > >> > can manage the consumer in case a broker will never return after a
> > >> failure.
> > >> > It will be acceptable to lose a couple of messages if a broker dies
> > and
> > >> the
> > >> > consumers have not consumed all messages at the point of failure.
> > >> > >
> > >> > > Thanks,
> > >> > > Oliver
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > Am 23.04.2012 um 19:58 schrieb Jay Kreps:
> > >> > >
> > >> > >> I think the confusion comes from the fact that we are using
> > mirroring
> > >> to
> > >> > >> handle geographic distribution not failover. If I understand
> > correctly
> > >> > what
> > >> > >> Oliver is asking for is something to give fault tolerance not
> > >> something
> > >> > for
> > >> > >> distribution. I don't think that is really what the mirroring
> does
> > out
> > >> > of
> > >> > >> the box, though technically i suppose you could just reset the
> > offsets
> > >> > and
> > >> > >> point the consumer at the new cluster and have it start from
> "now".
> > >> > >>
> > >> > >> I think it would be helpful to document our use case in the
> > mirroring
> > >> > docs
> > >> > >> since this is not the first time someone has asked about this.
> > >> > >>
> > >> > >> -Jay
> > >> > >>
> > >> > >> On Mon, Apr 23, 2012 at 10:38 AM, Joel Koshy <
> jjkoshy.w@gmail.com>
> > >> > wrote:
> > >> > >>
> > >> > >>> Hi Oliver,
> > >> > >>>
> > >> > >>> I was reading the mirroring guide and I wonder if it is required
> > that
> > >> > the
> > >> > >>>> mirror runs it's own zookeeper?
> > >> > >>>>
> > >> > >>>> We have a zookeeper cluster running which is used by different
> > >> > >>>> applications, so can we use that zookeeper cluster for the
> kafka
> > >> > source
> > >> > >>> and
> > >> > >>>> kafka mirror?
> > >> > >>>>
> > >> > >>>
> > >> > >>> You could have a single zookeeper cluster and use different
> > >> namespaces
> > >> > for
> > >> > >>> the source/target mirror. However, I don't think it is
> > recommended to
> > >> > use a
> > >> > >>> remote zookeeper (if you have a cross-DC set up) since that
> would
> > >> > >>> potentially mean very high ZK latencies on one of your clusters.
> > >> > >>>
> > >> > >>>
> > >> > >>>> What is the procedure if the kafka source server fails to
> switch
> > the
> > >> > >>>> applications to use the mirrored instance?
> > >> > >>>>
> > >> > >>>
> > >> > >>> I don't quite follow this question - can you clarify? The mirror
> > >> > cluster is
> > >> > >>> pretty much a separate instance. There is no built-in automatic
> > >> > fail-over
> > >> > >>> if your source cluster goes down.
> > >> > >>>
> > >> > >>>
> > >> > >>>> Are there any backup best practices if we would not use
> > mirroring?
> > >> > >>>>
> > >> > >>>
> > >> > >>> You can use RAID arrays for (local) data redundancy. You may
> also
> > be
> > >> > >>> interested in the (intra-DC) replication feature (KAFKA-50) that
> > is
> > >> > >>> currently being developed. I believe some folks on this list
> have
> > >> also
> > >> > used
> > >> > >>> plain rsync's as an alternative to mirroring.
> > >> > >>>
> > >> > >>> Thanks,
> > >> > >>>
> > >> > >>> Joel
> > >> > >>>
> > >> > >
> > >> >
> > >>
> >
>