You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "vlad.gm@gmail.com" <vl...@gmail.com> on 2015/03/24 19:44:12 UTC

MirrorMaker improvements

Dear all,

I had a short discussion with Jay yesterday at the ACM meetup and he
suggested writing an email regarding a few possible MirrorMaker
improvements.

At Turn, we have been using MirrorMaker for a a few months now to
asynchronously replicate our key/value store data between our datacenters.
In a way, our system is similar to Linkedin's Databus, but it uses Kafka
clusters and MirrorMaker as its building blocks. Our overall message rate
peaks at about 650K/sec and, when pushing data over high bandwidth delay
product links, we have found some minor bottlenecks.

The MirrorMaker process uses a standard consumer to pull data from a remote
datacenter. This implies that it opens a single TCP connection to each of
the remote brokers and muxes requests for different topics and partitions
over this connection. While this is a good thing in terms of maintaining
the congestion window open, over long RTT lines with rather high loss rate
the congestion window will cap, in our case at just a few Mbps. While the
overall line bandwidth is much higher, this means that we have to start
multiple MirrorMaker processes (somewhere in the hundreds), in order to
completely use the line capacity. Being able to pool multiple TCP
connections from a single consumer to a broker would solve this
complication.

The standard consumer also uses the remote ZooKeeper in order to manage the
consumer group. While consumer group management is moving closer to the
brokers, it might make sense to move the group management to the local
datacenter, since that would avoid using the long-distance connection for
this purpose.

Another possible improvement assumes a further constraint, namely that the
number of partitions for a topic in both datacenters is the same. In my
opinion, this is a sane constraint, since it preserves the Kafka ordering
guarantees (per partition), instead of a simple guarantee per key. This
kind of guarantee can be for example useful in a system that compares
partition contents to reach eventual consistency using Merkle trees. If the
number of partitions is equal, then offsets have the same meaning for the
same partition in both clusters, since the data for both partitions is
identical before the offset. This allows a simple consumer to just inquire
the local broker and the remote broker for their current offsets and, in
case the remote broker is ahead, copy the extra data to the local cluster.
Since the consumer offsets are no longer bound to the specific partitioning
of a single remote cluster, the consumer could pull from one of any number
of remote clusters, BitTorrent-style, if their offsets are ahead of the
local offset. The group management problem would reduce to assigning local
partitions to different MirrorMaker processes, so the group management
could be done locally also in this situation.

Regards,
Vlad

Re: MirrorMaker improvements

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for sharing this Vlad, this is great read!

I am particularly interested about the last bullet point of one-to-one
mapping in MM since you also mentioned that you use Kafka MM as the async
replication layer for your geo-replicated k-v store. One approach that we
are pursuing here to support active-active is to use a aggregate cluster
that mirror from multiple local clusters from different data centers. But
this approach disables the one-to-one mapping since it requires multiple
sources to pipe to a single destination. How did you tackle this problem at
Turn if you are also using active-active?

Guozhang

On Tue, Mar 24, 2015 at 12:18 PM, vlad.gm@gmail.com <vl...@gmail.com>
wrote:

> Dear all,
>
> I had a short discussion with Jay yesterday at the ACM meetup and he
> suggested writing an email regarding a few possible MirrorMaker
> improvements.
>
> At Turn, we have been using MirrorMaker for a a few months now to
> asynchronously replicate our key/value store data between our datacenters.
> In a way, our system is similar to Linkedin's Databus, but it uses Kafka
> clusters and MirrorMaker as its building blocks. Our overall message rate
> peaks at about 650K/sec and, when pushing data over high bandwidth delay
> product links, we have found some minor bottlenecks.
>
> The MirrorMaker process uses a standard consumer to pull data from a remote
> datacenter. This implies that it opens a single TCP connection to each of
> the remote brokers and muxes requests for different topics and partitions
> over this connection. While this is a good thing in terms of maintaining
> the congestion window open, over long RTT lines with rather high loss rate
> the congestion window will cap, in our case at just a few Mbps. While the
> overall line bandwidth is much higher, this means that we have to start
> multiple MirrorMaker processes (somewhere in the hundreds), in order to
> completely use the line capacity. Being able to pool multiple TCP
> connections from a single consumer to a broker would solve this
> complication.
>
> The standard consumer also uses the remote ZooKeeper in order to manage the
> consumer group. While consumer group management is moving closer to the
> brokers, it might make sense to move the group management to the local
> datacenter, since that would avoid using the long-distance connection for
> this purpose.
>
> Another possible improvement assumes a further constraint, namely that the
> number of partitions for a topic in both datacenters is the same. In my
> opinion, this is a sane constraint, since it preserves the Kafka ordering
> guarantees (per partition), instead of a simple guarantee per key. This
> kind of guarantee can be for example useful in a system that compares
> partition contents to reach eventual consistency using Merkle trees. If the
> number of partitions is equal, then offsets have the same meaning for the
> same partition in both clusters, since the data for both partitions is
> identical before the offset. This allows a simple consumer to just inquire
> the local broker and the remote broker for their current offsets and, in
> case the remote broker is ahead, copy the extra data to the local cluster.
> Since the consumer offsets are no longer bound to the specific partitioning
> of a single remote cluster, the consumer could pull from one of any number
> of remote clusters, BitTorrent-style, if their offsets are ahead of the
> local offset. The group management problem would reduce to assigning local
> partitions to different MirrorMaker processes, so the group management
> could be done locally also in this situation.
>
> Regards,
> Vlad
>
> PS: Sorry if this is a double posting! The original posting did not appear
> in the archives for a while.
>



-- 
-- Guozhang

Fwd: MirrorMaker improvements

Posted by "vlad.gm@gmail.com" <vl...@gmail.com>.
Dear all,

I had a short discussion with Jay yesterday at the ACM meetup and he
suggested writing an email regarding a few possible MirrorMaker
improvements.

At Turn, we have been using MirrorMaker for a a few months now to
asynchronously replicate our key/value store data between our datacenters.
In a way, our system is similar to Linkedin's Databus, but it uses Kafka
clusters and MirrorMaker as its building blocks. Our overall message rate
peaks at about 650K/sec and, when pushing data over high bandwidth delay
product links, we have found some minor bottlenecks.

The MirrorMaker process uses a standard consumer to pull data from a remote
datacenter. This implies that it opens a single TCP connection to each of
the remote brokers and muxes requests for different topics and partitions
over this connection. While this is a good thing in terms of maintaining
the congestion window open, over long RTT lines with rather high loss rate
the congestion window will cap, in our case at just a few Mbps. While the
overall line bandwidth is much higher, this means that we have to start
multiple MirrorMaker processes (somewhere in the hundreds), in order to
completely use the line capacity. Being able to pool multiple TCP
connections from a single consumer to a broker would solve this
complication.

The standard consumer also uses the remote ZooKeeper in order to manage the
consumer group. While consumer group management is moving closer to the
brokers, it might make sense to move the group management to the local
datacenter, since that would avoid using the long-distance connection for
this purpose.

Another possible improvement assumes a further constraint, namely that the
number of partitions for a topic in both datacenters is the same. In my
opinion, this is a sane constraint, since it preserves the Kafka ordering
guarantees (per partition), instead of a simple guarantee per key. This
kind of guarantee can be for example useful in a system that compares
partition contents to reach eventual consistency using Merkle trees. If the
number of partitions is equal, then offsets have the same meaning for the
same partition in both clusters, since the data for both partitions is
identical before the offset. This allows a simple consumer to just inquire
the local broker and the remote broker for their current offsets and, in
case the remote broker is ahead, copy the extra data to the local cluster.
Since the consumer offsets are no longer bound to the specific partitioning
of a single remote cluster, the consumer could pull from one of any number
of remote clusters, BitTorrent-style, if their offsets are ahead of the
local offset. The group management problem would reduce to assigning local
partitions to different MirrorMaker processes, so the group management
could be done locally also in this situation.

Regards,
Vlad

PS: Sorry if this is a double posting! The original posting did not appear
in the archives for a while.

Re: MirrorMaker improvements

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
You can set the num.consumer.fetchers to be a larger number (e.g. 3) and
there will be more fetchers created to fetch from different partitions in
the same broker. Each fetcher will have there own TCP connection.


WRT the congestion window, yes, if the link has a high packet drop rate
then congestion avoidance will occur before the bandwidth get fully used.
I thought in normal cases, packets only got dropped occasionally even with
a long link (that’s why congestion avoidance only occur when there are 3
duplicated acks or a timeout). So if packet drop rate is high, it sounds
more like a link quality issue or the network is really congested - like
in the paper you mentioned, one of the internet router enforces drop-tail
policy because buffer is full due to bursty traffic.

Jiangjie (Becket) Qin

On 3/25/15, 4:08 PM, "vlad.gm@gmail.com" <vl...@gmail.com> wrote:

>Hi Jianqjie,
>
>I only noticed a single TCP connection between a MM process to a single
>broker. Is there something I could have done to open up more connections?
>
>TCP can actually cap before saturating the network, which is the reason
>for
>which it is hard to utilize a high bandwidth latency link with a single
>TCP
>connection. There is an equation that links the MSS, RTT and loss rate of
>the link to the TCP achievable throughput. Notice that the link bandwidth
>does not come into play, since the only way it can affect throughput is by
>increasing the loss rate due to drops when the link is congested. On WAN
>links, however, usually a single connection will cap (due to random losses
>and high RTT), long before achieving the capacity of the link. Here is a
>reference for this:
>http://www.ece.virginia.edu/~mv/edu/715/lectures/TCP/padhye98modeling.pdf
>
>Regards,
>Vlad
>
>On Wed, Mar 25, 2015 at 3:43 PM, Jiangjie Qin <jq...@linkedin.com.invalid>
>wrote:
>
>> Hi Vlad,
>>
>> I am not sure I understand the congestion window part. So TCP congestion
>> control will only occur when you are saturating the network. If that is
>> the case, bandwidth has already become the bottleneck. But we are
>>talking
>> about network under utilization, no?
>>
>> Another thing is that each fetcher thread has their own BlockingChannel
>>to
>> the broker, so they have dedicated TCP connections. Could you explain
>>more
>> on the Mux?
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/25/15, 2:59 PM, "vlad.gm@gmail.com" <vl...@gmail.com> wrote:
>>
>> >@Guozhang
>> >We actually have separate topics depending on the source of the message
>> >and
>> >the multicast distribution group (the set of destinations). Our topics
>>are
>> >named: source_multicast-group. We do not aggregate data but we do
>>static
>> >routing based on the destination and the destination set (that is, we
>>set
>> >up a tree of mirrormakers to copy the topic from the original
>>datacenter
>> >to
>> >the others). This gives us a static topology (no path failure
>>resilience)
>> >and limits the number of multicast groups (since each multicase group
>> >needs
>> >a different topic for every source), but for our data replication
>>pattern
>> >is a good match. It also helps that the order of writes in our system
>>is
>> >not important, so we do not need a single point of aggregation :)
>> >
>> >@Jun
>> >The actual problem is the congestion window, I do not think that the we
>> >are
>> >suffering due to the transmit/receive socket buffers (we are using the
>> >same
>> >buffers over different links with similar RTT but different loss rates
>>and
>> >the TCP connection throughput varies a lot, this would not be the case
>>if
>> >the amount of in-flight data would be limited by buffer size). The
>> >socket-level cwnd metrics also support our hypothesis and we also have
>> >measured using iperf what a single connection can transport across a
>>lossy
>> >inter-DC link. Jianqie seems to be suggesting a different blocking
>> >scenario, similar to head-of-line blocking because of other requests,
>> >however increasing the number of fetchers will not necessarily help
>>since
>> >all fetchers will mux their request over a single TCP connection when
>> >sending requests to a single broker. The TCP connection's congestion
>> >window
>> >will continue to be the limiting factor. I would say that the only way
>>out
>> >of this is to pool multiple TCP connections from a single consumer to a
>> >broker.
>> >
>> >For identical mirroring, I thought that when asking for data between a
>> >pair
>> >of offsets the result should always be the same. Would it be possible
>>to
>> >produce also indicating the offsets where the data should go?
>> >
>> >Regards,
>> >Vlad
>> >
>> >On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin
>><jqin@linkedin.com.invalid
>> >
>> >wrote:
>> >
>> >> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but
>> >>might
>> >> not be sufficient.
>> >> There are actually two related factors here:
>> >> 1. Pipelining TCP packets when send a single request/response.
>> >> 2. Pipelining multiple requests/responses
>> >> Bumping up socket.receive.buffer.bytes help with the 1) but does not
>> >>help
>> >> with 2).
>> >>
>> >> For example, consider the following scenario.
>> >> RTT = 100 ms
>> >> Bandwidth = 1 Gbps(128 MBps).
>> >> Request size = 10KB
>> >> Response size = 1MB
>> >> If we only have a single fetcher which is working in a blocking way.
>>The
>> >> max number of requests we can achieve is 10 requests/sec because its
>> >> restricted by the RTT. In this case, bumping up socket buffer size
>>will
>> >> not help. I think this is the situation Vlad mentioned.
>> >>
>> >> One option might be increase num.consumer.fetchers, so we might have
>> >>more
>> >> fetcher thread for a since consumer instance (due to the
>>implementation,
>> >> num.consumer.fetchers actually means "at most
>>num.consumer.fetchers²).
>> >>
>> >> One thing might worth considering is that can we enforce pipelining
>>in
>> >>new
>> >> consumer like we do for new producer. Since we have correlation ID,
>> >> reorder should be easily handled. I haven¹t got a chance to read the
>>new
>> >> consumer code, but I think it is worth doing if we haven¹t done so.
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >> On 3/25/15, 9:50 AM, "Jun Rao" <ju...@confluent.io> wrote:
>> >>
>> >> >To amortize the long RTT across data centers, you can tune the TCP
>> >>window
>> >> >size by configuring a larger socket.receive.buffer.bytes in the
>> >>consumer.
>> >> >
>> >> >For the last one, it seems that you want identical mirroring. The
>> >>tricky
>> >> >thing is to figure out how to avoid duplicates when there is a
>> >>failure. We
>> >> >had some related discussion in the context of transactional
>>messaging (
>> >> >
>> >>
>> >>
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging
>> >>+
>> >> >in+Kafka
>> >> >).
>> >> >
>> >> >Thanks,
>> >> >
>> >> >Jun
>> >> >
>> >> >On Tue, Mar 24, 2015 at 11:44 AM, vlad.gm@gmail.com
>><vlad.gm@gmail.com
>> >
>> >> >wrote:
>> >> >
>> >> >> Dear all,
>> >> >>
>> >> >> I had a short discussion with Jay yesterday at the ACM meetup and
>>he
>> >> >> suggested writing an email regarding a few possible MirrorMaker
>> >> >> improvements.
>> >> >>
>> >> >> At Turn, we have been using MirrorMaker for a a few months now to
>> >> >> asynchronously replicate our key/value store data between our
>> >> >>datacenters.
>> >> >> In a way, our system is similar to Linkedin's Databus, but it uses
>> >>Kafka
>> >> >> clusters and MirrorMaker as its building blocks. Our overall
>>message
>> >> >>rate
>> >> >> peaks at about 650K/sec and, when pushing data over high bandwidth
>> >>delay
>> >> >> product links, we have found some minor bottlenecks.
>> >> >>
>> >> >> The MirrorMaker process uses a standard consumer to pull data
>>from a
>> >> >>remote
>> >> >> datacenter. This implies that it opens a single TCP connection to
>> >>each
>> >> >>of
>> >> >> the remote brokers and muxes requests for different topics and
>> >> >>partitions
>> >> >> over this connection. While this is a good thing in terms of
>> >>maintaining
>> >> >> the congestion window open, over long RTT lines with rather high
>>loss
>> >> >>rate
>> >> >> the congestion window will cap, in our case at just a few Mbps.
>>While
>> >> >>the
>> >> >> overall line bandwidth is much higher, this means that we have to
>> >>start
>> >> >> multiple MirrorMaker processes (somewhere in the hundreds), in
>>order
>> >>to
>> >> >> completely use the line capacity. Being able to pool multiple TCP
>> >> >> connections from a single consumer to a broker would solve this
>> >> >> complication.
>> >> >>
>> >> >> The standard consumer also uses the remote ZooKeeper in order to
>> >>manage
>> >> >>the
>> >> >> consumer group. While consumer group management is moving closer
>>to
>> >>the
>> >> >> brokers, it might make sense to move the group management to the
>> >>local
>> >> >> datacenter, since that would avoid using the long-distance
>>connection
>> >> >>for
>> >> >> this purpose.
>> >> >>
>> >> >> Another possible improvement assumes a further constraint, namely
>> >>that
>> >> >>the
>> >> >> number of partitions for a topic in both datacenters is the same.
>>In
>> >>my
>> >> >> opinion, this is a sane constraint, since it preserves the Kafka
>> >> >>ordering
>> >> >> guarantees (per partition), instead of a simple guarantee per key.
>> >>This
>> >> >> kind of guarantee can be for example useful in a system that
>>compares
>> >> >> partition contents to reach eventual consistency using Merkle
>>trees.
>> >>If
>> >> >>the
>> >> >> number of partitions is equal, then offsets have the same meaning
>>for
>> >> >>the
>> >> >> same partition in both clusters, since the data for both
>>partitions
>> >>is
>> >> >> identical before the offset. This allows a simple consumer to just
>> >> >>inquire
>> >> >> the local broker and the remote broker for their current offsets
>> >>and, in
>> >> >> case the remote broker is ahead, copy the extra data to the local
>> >> >>cluster.
>> >> >> Since the consumer offsets are no longer bound to the specific
>> >> >>partitioning
>> >> >> of a single remote cluster, the consumer could pull from one of
>>any
>> >> >>number
>> >> >> of remote clusters, BitTorrent-style, if their offsets are ahead
>>of
>> >>the
>> >> >> local offset. The group management problem would reduce to
>>assigning
>> >> >>local
>> >> >> partitions to different MirrorMaker processes, so the group
>> >>management
>> >> >> could be done locally also in this situation.
>> >> >>
>> >> >> Regards,
>> >> >> Vlad
>> >> >>
>> >>
>> >>
>>
>>


Re: MirrorMaker improvements

Posted by "vlad.gm@gmail.com" <vl...@gmail.com>.
Hi Jianqjie,

I only noticed a single TCP connection between a MM process to a single
broker. Is there something I could have done to open up more connections?

TCP can actually cap before saturating the network, which is the reason for
which it is hard to utilize a high bandwidth latency link with a single TCP
connection. There is an equation that links the MSS, RTT and loss rate of
the link to the TCP achievable throughput. Notice that the link bandwidth
does not come into play, since the only way it can affect throughput is by
increasing the loss rate due to drops when the link is congested. On WAN
links, however, usually a single connection will cap (due to random losses
and high RTT), long before achieving the capacity of the link. Here is a
reference for this:
http://www.ece.virginia.edu/~mv/edu/715/lectures/TCP/padhye98modeling.pdf

Regards,
Vlad

On Wed, Mar 25, 2015 at 3:43 PM, Jiangjie Qin <jq...@linkedin.com.invalid>
wrote:

> Hi Vlad,
>
> I am not sure I understand the congestion window part. So TCP congestion
> control will only occur when you are saturating the network. If that is
> the case, bandwidth has already become the bottleneck. But we are talking
> about network under utilization, no?
>
> Another thing is that each fetcher thread has their own BlockingChannel to
> the broker, so they have dedicated TCP connections. Could you explain more
> on the Mux?
>
> Jiangjie (Becket) Qin
>
> On 3/25/15, 2:59 PM, "vlad.gm@gmail.com" <vl...@gmail.com> wrote:
>
> >@Guozhang
> >We actually have separate topics depending on the source of the message
> >and
> >the multicast distribution group (the set of destinations). Our topics are
> >named: source_multicast-group. We do not aggregate data but we do static
> >routing based on the destination and the destination set (that is, we set
> >up a tree of mirrormakers to copy the topic from the original datacenter
> >to
> >the others). This gives us a static topology (no path failure resilience)
> >and limits the number of multicast groups (since each multicase group
> >needs
> >a different topic for every source), but for our data replication pattern
> >is a good match. It also helps that the order of writes in our system is
> >not important, so we do not need a single point of aggregation :)
> >
> >@Jun
> >The actual problem is the congestion window, I do not think that the we
> >are
> >suffering due to the transmit/receive socket buffers (we are using the
> >same
> >buffers over different links with similar RTT but different loss rates and
> >the TCP connection throughput varies a lot, this would not be the case if
> >the amount of in-flight data would be limited by buffer size). The
> >socket-level cwnd metrics also support our hypothesis and we also have
> >measured using iperf what a single connection can transport across a lossy
> >inter-DC link. Jianqie seems to be suggesting a different blocking
> >scenario, similar to head-of-line blocking because of other requests,
> >however increasing the number of fetchers will not necessarily help since
> >all fetchers will mux their request over a single TCP connection when
> >sending requests to a single broker. The TCP connection's congestion
> >window
> >will continue to be the limiting factor. I would say that the only way out
> >of this is to pool multiple TCP connections from a single consumer to a
> >broker.
> >
> >For identical mirroring, I thought that when asking for data between a
> >pair
> >of offsets the result should always be the same. Would it be possible to
> >produce also indicating the offsets where the data should go?
> >
> >Regards,
> >Vlad
> >
> >On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin <jqin@linkedin.com.invalid
> >
> >wrote:
> >
> >> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but
> >>might
> >> not be sufficient.
> >> There are actually two related factors here:
> >> 1. Pipelining TCP packets when send a single request/response.
> >> 2. Pipelining multiple requests/responses
> >> Bumping up socket.receive.buffer.bytes help with the 1) but does not
> >>help
> >> with 2).
> >>
> >> For example, consider the following scenario.
> >> RTT = 100 ms
> >> Bandwidth = 1 Gbps(128 MBps).
> >> Request size = 10KB
> >> Response size = 1MB
> >> If we only have a single fetcher which is working in a blocking way. The
> >> max number of requests we can achieve is 10 requests/sec because its
> >> restricted by the RTT. In this case, bumping up socket buffer size will
> >> not help. I think this is the situation Vlad mentioned.
> >>
> >> One option might be increase num.consumer.fetchers, so we might have
> >>more
> >> fetcher thread for a since consumer instance (due to the implementation,
> >> num.consumer.fetchers actually means "at most num.consumer.fetchers²).
> >>
> >> One thing might worth considering is that can we enforce pipelining in
> >>new
> >> consumer like we do for new producer. Since we have correlation ID,
> >> reorder should be easily handled. I haven¹t got a chance to read the new
> >> consumer code, but I think it is worth doing if we haven¹t done so.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 3/25/15, 9:50 AM, "Jun Rao" <ju...@confluent.io> wrote:
> >>
> >> >To amortize the long RTT across data centers, you can tune the TCP
> >>window
> >> >size by configuring a larger socket.receive.buffer.bytes in the
> >>consumer.
> >> >
> >> >For the last one, it seems that you want identical mirroring. The
> >>tricky
> >> >thing is to figure out how to avoid duplicates when there is a
> >>failure. We
> >> >had some related discussion in the context of transactional messaging (
> >> >
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging
> >>+
> >> >in+Kafka
> >> >).
> >> >
> >> >Thanks,
> >> >
> >> >Jun
> >> >
> >> >On Tue, Mar 24, 2015 at 11:44 AM, vlad.gm@gmail.com <vlad.gm@gmail.com
> >
> >> >wrote:
> >> >
> >> >> Dear all,
> >> >>
> >> >> I had a short discussion with Jay yesterday at the ACM meetup and he
> >> >> suggested writing an email regarding a few possible MirrorMaker
> >> >> improvements.
> >> >>
> >> >> At Turn, we have been using MirrorMaker for a a few months now to
> >> >> asynchronously replicate our key/value store data between our
> >> >>datacenters.
> >> >> In a way, our system is similar to Linkedin's Databus, but it uses
> >>Kafka
> >> >> clusters and MirrorMaker as its building blocks. Our overall message
> >> >>rate
> >> >> peaks at about 650K/sec and, when pushing data over high bandwidth
> >>delay
> >> >> product links, we have found some minor bottlenecks.
> >> >>
> >> >> The MirrorMaker process uses a standard consumer to pull data from a
> >> >>remote
> >> >> datacenter. This implies that it opens a single TCP connection to
> >>each
> >> >>of
> >> >> the remote brokers and muxes requests for different topics and
> >> >>partitions
> >> >> over this connection. While this is a good thing in terms of
> >>maintaining
> >> >> the congestion window open, over long RTT lines with rather high loss
> >> >>rate
> >> >> the congestion window will cap, in our case at just a few Mbps. While
> >> >>the
> >> >> overall line bandwidth is much higher, this means that we have to
> >>start
> >> >> multiple MirrorMaker processes (somewhere in the hundreds), in order
> >>to
> >> >> completely use the line capacity. Being able to pool multiple TCP
> >> >> connections from a single consumer to a broker would solve this
> >> >> complication.
> >> >>
> >> >> The standard consumer also uses the remote ZooKeeper in order to
> >>manage
> >> >>the
> >> >> consumer group. While consumer group management is moving closer to
> >>the
> >> >> brokers, it might make sense to move the group management to the
> >>local
> >> >> datacenter, since that would avoid using the long-distance connection
> >> >>for
> >> >> this purpose.
> >> >>
> >> >> Another possible improvement assumes a further constraint, namely
> >>that
> >> >>the
> >> >> number of partitions for a topic in both datacenters is the same. In
> >>my
> >> >> opinion, this is a sane constraint, since it preserves the Kafka
> >> >>ordering
> >> >> guarantees (per partition), instead of a simple guarantee per key.
> >>This
> >> >> kind of guarantee can be for example useful in a system that compares
> >> >> partition contents to reach eventual consistency using Merkle trees.
> >>If
> >> >>the
> >> >> number of partitions is equal, then offsets have the same meaning for
> >> >>the
> >> >> same partition in both clusters, since the data for both partitions
> >>is
> >> >> identical before the offset. This allows a simple consumer to just
> >> >>inquire
> >> >> the local broker and the remote broker for their current offsets
> >>and, in
> >> >> case the remote broker is ahead, copy the extra data to the local
> >> >>cluster.
> >> >> Since the consumer offsets are no longer bound to the specific
> >> >>partitioning
> >> >> of a single remote cluster, the consumer could pull from one of any
> >> >>number
> >> >> of remote clusters, BitTorrent-style, if their offsets are ahead of
> >>the
> >> >> local offset. The group management problem would reduce to assigning
> >> >>local
> >> >> partitions to different MirrorMaker processes, so the group
> >>management
> >> >> could be done locally also in this situation.
> >> >>
> >> >> Regards,
> >> >> Vlad
> >> >>
> >>
> >>
>
>

Re: MirrorMaker improvements

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Hi Vlad,

I am not sure I understand the congestion window part. So TCP congestion
control will only occur when you are saturating the network. If that is
the case, bandwidth has already become the bottleneck. But we are talking
about network under utilization, no?

Another thing is that each fetcher thread has their own BlockingChannel to
the broker, so they have dedicated TCP connections. Could you explain more
on the Mux?

Jiangjie (Becket) Qin

On 3/25/15, 2:59 PM, "vlad.gm@gmail.com" <vl...@gmail.com> wrote:

>@Guozhang
>We actually have separate topics depending on the source of the message
>and
>the multicast distribution group (the set of destinations). Our topics are
>named: source_multicast-group. We do not aggregate data but we do static
>routing based on the destination and the destination set (that is, we set
>up a tree of mirrormakers to copy the topic from the original datacenter
>to
>the others). This gives us a static topology (no path failure resilience)
>and limits the number of multicast groups (since each multicase group
>needs
>a different topic for every source), but for our data replication pattern
>is a good match. It also helps that the order of writes in our system is
>not important, so we do not need a single point of aggregation :)
>
>@Jun
>The actual problem is the congestion window, I do not think that the we
>are
>suffering due to the transmit/receive socket buffers (we are using the
>same
>buffers over different links with similar RTT but different loss rates and
>the TCP connection throughput varies a lot, this would not be the case if
>the amount of in-flight data would be limited by buffer size). The
>socket-level cwnd metrics also support our hypothesis and we also have
>measured using iperf what a single connection can transport across a lossy
>inter-DC link. Jianqie seems to be suggesting a different blocking
>scenario, similar to head-of-line blocking because of other requests,
>however increasing the number of fetchers will not necessarily help since
>all fetchers will mux their request over a single TCP connection when
>sending requests to a single broker. The TCP connection's congestion
>window
>will continue to be the limiting factor. I would say that the only way out
>of this is to pool multiple TCP connections from a single consumer to a
>broker.
>
>For identical mirroring, I thought that when asking for data between a
>pair
>of offsets the result should always be the same. Would it be possible to
>produce also indicating the offsets where the data should go?
>
>Regards,
>Vlad
>
>On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
>wrote:
>
>> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but
>>might
>> not be sufficient.
>> There are actually two related factors here:
>> 1. Pipelining TCP packets when send a single request/response.
>> 2. Pipelining multiple requests/responses
>> Bumping up socket.receive.buffer.bytes help with the 1) but does not
>>help
>> with 2).
>>
>> For example, consider the following scenario.
>> RTT = 100 ms
>> Bandwidth = 1 Gbps(128 MBps).
>> Request size = 10KB
>> Response size = 1MB
>> If we only have a single fetcher which is working in a blocking way. The
>> max number of requests we can achieve is 10 requests/sec because its
>> restricted by the RTT. In this case, bumping up socket buffer size will
>> not help. I think this is the situation Vlad mentioned.
>>
>> One option might be increase num.consumer.fetchers, so we might have
>>more
>> fetcher thread for a since consumer instance (due to the implementation,
>> num.consumer.fetchers actually means "at most num.consumer.fetchers²).
>>
>> One thing might worth considering is that can we enforce pipelining in
>>new
>> consumer like we do for new producer. Since we have correlation ID,
>> reorder should be easily handled. I haven¹t got a chance to read the new
>> consumer code, but I think it is worth doing if we haven¹t done so.
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/25/15, 9:50 AM, "Jun Rao" <ju...@confluent.io> wrote:
>>
>> >To amortize the long RTT across data centers, you can tune the TCP
>>window
>> >size by configuring a larger socket.receive.buffer.bytes in the
>>consumer.
>> >
>> >For the last one, it seems that you want identical mirroring. The
>>tricky
>> >thing is to figure out how to avoid duplicates when there is a
>>failure. We
>> >had some related discussion in the context of transactional messaging (
>> >
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging
>>+
>> >in+Kafka
>> >).
>> >
>> >Thanks,
>> >
>> >Jun
>> >
>> >On Tue, Mar 24, 2015 at 11:44 AM, vlad.gm@gmail.com <vl...@gmail.com>
>> >wrote:
>> >
>> >> Dear all,
>> >>
>> >> I had a short discussion with Jay yesterday at the ACM meetup and he
>> >> suggested writing an email regarding a few possible MirrorMaker
>> >> improvements.
>> >>
>> >> At Turn, we have been using MirrorMaker for a a few months now to
>> >> asynchronously replicate our key/value store data between our
>> >>datacenters.
>> >> In a way, our system is similar to Linkedin's Databus, but it uses
>>Kafka
>> >> clusters and MirrorMaker as its building blocks. Our overall message
>> >>rate
>> >> peaks at about 650K/sec and, when pushing data over high bandwidth
>>delay
>> >> product links, we have found some minor bottlenecks.
>> >>
>> >> The MirrorMaker process uses a standard consumer to pull data from a
>> >>remote
>> >> datacenter. This implies that it opens a single TCP connection to
>>each
>> >>of
>> >> the remote brokers and muxes requests for different topics and
>> >>partitions
>> >> over this connection. While this is a good thing in terms of
>>maintaining
>> >> the congestion window open, over long RTT lines with rather high loss
>> >>rate
>> >> the congestion window will cap, in our case at just a few Mbps. While
>> >>the
>> >> overall line bandwidth is much higher, this means that we have to
>>start
>> >> multiple MirrorMaker processes (somewhere in the hundreds), in order
>>to
>> >> completely use the line capacity. Being able to pool multiple TCP
>> >> connections from a single consumer to a broker would solve this
>> >> complication.
>> >>
>> >> The standard consumer also uses the remote ZooKeeper in order to
>>manage
>> >>the
>> >> consumer group. While consumer group management is moving closer to
>>the
>> >> brokers, it might make sense to move the group management to the
>>local
>> >> datacenter, since that would avoid using the long-distance connection
>> >>for
>> >> this purpose.
>> >>
>> >> Another possible improvement assumes a further constraint, namely
>>that
>> >>the
>> >> number of partitions for a topic in both datacenters is the same. In
>>my
>> >> opinion, this is a sane constraint, since it preserves the Kafka
>> >>ordering
>> >> guarantees (per partition), instead of a simple guarantee per key.
>>This
>> >> kind of guarantee can be for example useful in a system that compares
>> >> partition contents to reach eventual consistency using Merkle trees.
>>If
>> >>the
>> >> number of partitions is equal, then offsets have the same meaning for
>> >>the
>> >> same partition in both clusters, since the data for both partitions
>>is
>> >> identical before the offset. This allows a simple consumer to just
>> >>inquire
>> >> the local broker and the remote broker for their current offsets
>>and, in
>> >> case the remote broker is ahead, copy the extra data to the local
>> >>cluster.
>> >> Since the consumer offsets are no longer bound to the specific
>> >>partitioning
>> >> of a single remote cluster, the consumer could pull from one of any
>> >>number
>> >> of remote clusters, BitTorrent-style, if their offsets are ahead of
>>the
>> >> local offset. The group management problem would reduce to assigning
>> >>local
>> >> partitions to different MirrorMaker processes, so the group
>>management
>> >> could be done locally also in this situation.
>> >>
>> >> Regards,
>> >> Vlad
>> >>
>>
>>


Re: MirrorMaker improvements

Posted by "vlad.gm@gmail.com" <vl...@gmail.com>.
@Guozhang
We actually have separate topics depending on the source of the message and
the multicast distribution group (the set of destinations). Our topics are
named: source_multicast-group. We do not aggregate data but we do static
routing based on the destination and the destination set (that is, we set
up a tree of mirrormakers to copy the topic from the original datacenter to
the others). This gives us a static topology (no path failure resilience)
and limits the number of multicast groups (since each multicase group needs
a different topic for every source), but for our data replication pattern
is a good match. It also helps that the order of writes in our system is
not important, so we do not need a single point of aggregation :)

@Jun
The actual problem is the congestion window, I do not think that the we are
suffering due to the transmit/receive socket buffers (we are using the same
buffers over different links with similar RTT but different loss rates and
the TCP connection throughput varies a lot, this would not be the case if
the amount of in-flight data would be limited by buffer size). The
socket-level cwnd metrics also support our hypothesis and we also have
measured using iperf what a single connection can transport across a lossy
inter-DC link. Jianqie seems to be suggesting a different blocking
scenario, similar to head-of-line blocking because of other requests,
however increasing the number of fetchers will not necessarily help since
all fetchers will mux their request over a single TCP connection when
sending requests to a single broker. The TCP connection's congestion window
will continue to be the limiting factor. I would say that the only way out
of this is to pool multiple TCP connections from a single consumer to a
broker.

For identical mirroring, I thought that when asking for data between a pair
of offsets the result should always be the same. Would it be possible to
produce also indicating the offsets where the data should go?

Regards,
Vlad

On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
wrote:

> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but might
> not be sufficient.
> There are actually two related factors here:
> 1. Pipelining TCP packets when send a single request/response.
> 2. Pipelining multiple requests/responses
> Bumping up socket.receive.buffer.bytes help with the 1) but does not help
> with 2).
>
> For example, consider the following scenario.
> RTT = 100 ms
> Bandwidth = 1 Gbps(128 MBps).
> Request size = 10KB
> Response size = 1MB
> If we only have a single fetcher which is working in a blocking way. The
> max number of requests we can achieve is 10 requests/sec because its
> restricted by the RTT. In this case, bumping up socket buffer size will
> not help. I think this is the situation Vlad mentioned.
>
> One option might be increase num.consumer.fetchers, so we might have more
> fetcher thread for a since consumer instance (due to the implementation,
> num.consumer.fetchers actually means "at most num.consumer.fetchers²).
>
> One thing might worth considering is that can we enforce pipelining in new
> consumer like we do for new producer. Since we have correlation ID,
> reorder should be easily handled. I haven¹t got a chance to read the new
> consumer code, but I think it is worth doing if we haven¹t done so.
>
> Jiangjie (Becket) Qin
>
> On 3/25/15, 9:50 AM, "Jun Rao" <ju...@confluent.io> wrote:
>
> >To amortize the long RTT across data centers, you can tune the TCP window
> >size by configuring a larger socket.receive.buffer.bytes in the consumer.
> >
> >For the last one, it seems that you want identical mirroring. The tricky
> >thing is to figure out how to avoid duplicates when there is a failure. We
> >had some related discussion in the context of transactional messaging (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
> >in+Kafka
> >).
> >
> >Thanks,
> >
> >Jun
> >
> >On Tue, Mar 24, 2015 at 11:44 AM, vlad.gm@gmail.com <vl...@gmail.com>
> >wrote:
> >
> >> Dear all,
> >>
> >> I had a short discussion with Jay yesterday at the ACM meetup and he
> >> suggested writing an email regarding a few possible MirrorMaker
> >> improvements.
> >>
> >> At Turn, we have been using MirrorMaker for a a few months now to
> >> asynchronously replicate our key/value store data between our
> >>datacenters.
> >> In a way, our system is similar to Linkedin's Databus, but it uses Kafka
> >> clusters and MirrorMaker as its building blocks. Our overall message
> >>rate
> >> peaks at about 650K/sec and, when pushing data over high bandwidth delay
> >> product links, we have found some minor bottlenecks.
> >>
> >> The MirrorMaker process uses a standard consumer to pull data from a
> >>remote
> >> datacenter. This implies that it opens a single TCP connection to each
> >>of
> >> the remote brokers and muxes requests for different topics and
> >>partitions
> >> over this connection. While this is a good thing in terms of maintaining
> >> the congestion window open, over long RTT lines with rather high loss
> >>rate
> >> the congestion window will cap, in our case at just a few Mbps. While
> >>the
> >> overall line bandwidth is much higher, this means that we have to start
> >> multiple MirrorMaker processes (somewhere in the hundreds), in order to
> >> completely use the line capacity. Being able to pool multiple TCP
> >> connections from a single consumer to a broker would solve this
> >> complication.
> >>
> >> The standard consumer also uses the remote ZooKeeper in order to manage
> >>the
> >> consumer group. While consumer group management is moving closer to the
> >> brokers, it might make sense to move the group management to the local
> >> datacenter, since that would avoid using the long-distance connection
> >>for
> >> this purpose.
> >>
> >> Another possible improvement assumes a further constraint, namely that
> >>the
> >> number of partitions for a topic in both datacenters is the same. In my
> >> opinion, this is a sane constraint, since it preserves the Kafka
> >>ordering
> >> guarantees (per partition), instead of a simple guarantee per key. This
> >> kind of guarantee can be for example useful in a system that compares
> >> partition contents to reach eventual consistency using Merkle trees. If
> >>the
> >> number of partitions is equal, then offsets have the same meaning for
> >>the
> >> same partition in both clusters, since the data for both partitions is
> >> identical before the offset. This allows a simple consumer to just
> >>inquire
> >> the local broker and the remote broker for their current offsets and, in
> >> case the remote broker is ahead, copy the extra data to the local
> >>cluster.
> >> Since the consumer offsets are no longer bound to the specific
> >>partitioning
> >> of a single remote cluster, the consumer could pull from one of any
> >>number
> >> of remote clusters, BitTorrent-style, if their offsets are ahead of the
> >> local offset. The group management problem would reduce to assigning
> >>local
> >> partitions to different MirrorMaker processes, so the group management
> >> could be done locally also in this situation.
> >>
> >> Regards,
> >> Vlad
> >>
>
>

Re: MirrorMaker improvements

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Hi Jun, I think bumping up socket.receive.buffer.bytes will help but might
not be sufficient.
There are actually two related factors here:
1. Pipelining TCP packets when send a single request/response.
2. Pipelining multiple requests/responses
Bumping up socket.receive.buffer.bytes help with the 1) but does not help
with 2).

For example, consider the following scenario.
RTT = 100 ms
Bandwidth = 1 Gbps(128 MBps).
Request size = 10KB
Response size = 1MB
If we only have a single fetcher which is working in a blocking way. The
max number of requests we can achieve is 10 requests/sec because its
restricted by the RTT. In this case, bumping up socket buffer size will
not help. I think this is the situation Vlad mentioned.

One option might be increase num.consumer.fetchers, so we might have more
fetcher thread for a since consumer instance (due to the implementation,
num.consumer.fetchers actually means "at most num.consumer.fetchers²).

One thing might worth considering is that can we enforce pipelining in new
consumer like we do for new producer. Since we have correlation ID,
reorder should be easily handled. I haven¹t got a chance to read the new
consumer code, but I think it is worth doing if we haven¹t done so.

Jiangjie (Becket) Qin

On 3/25/15, 9:50 AM, "Jun Rao" <ju...@confluent.io> wrote:

>To amortize the long RTT across data centers, you can tune the TCP window
>size by configuring a larger socket.receive.buffer.bytes in the consumer.
>
>For the last one, it seems that you want identical mirroring. The tricky
>thing is to figure out how to avoid duplicates when there is a failure. We
>had some related discussion in the context of transactional messaging (
>https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
>in+Kafka
>).
>
>Thanks,
>
>Jun
>
>On Tue, Mar 24, 2015 at 11:44 AM, vlad.gm@gmail.com <vl...@gmail.com>
>wrote:
>
>> Dear all,
>>
>> I had a short discussion with Jay yesterday at the ACM meetup and he
>> suggested writing an email regarding a few possible MirrorMaker
>> improvements.
>>
>> At Turn, we have been using MirrorMaker for a a few months now to
>> asynchronously replicate our key/value store data between our
>>datacenters.
>> In a way, our system is similar to Linkedin's Databus, but it uses Kafka
>> clusters and MirrorMaker as its building blocks. Our overall message
>>rate
>> peaks at about 650K/sec and, when pushing data over high bandwidth delay
>> product links, we have found some minor bottlenecks.
>>
>> The MirrorMaker process uses a standard consumer to pull data from a
>>remote
>> datacenter. This implies that it opens a single TCP connection to each
>>of
>> the remote brokers and muxes requests for different topics and
>>partitions
>> over this connection. While this is a good thing in terms of maintaining
>> the congestion window open, over long RTT lines with rather high loss
>>rate
>> the congestion window will cap, in our case at just a few Mbps. While
>>the
>> overall line bandwidth is much higher, this means that we have to start
>> multiple MirrorMaker processes (somewhere in the hundreds), in order to
>> completely use the line capacity. Being able to pool multiple TCP
>> connections from a single consumer to a broker would solve this
>> complication.
>>
>> The standard consumer also uses the remote ZooKeeper in order to manage
>>the
>> consumer group. While consumer group management is moving closer to the
>> brokers, it might make sense to move the group management to the local
>> datacenter, since that would avoid using the long-distance connection
>>for
>> this purpose.
>>
>> Another possible improvement assumes a further constraint, namely that
>>the
>> number of partitions for a topic in both datacenters is the same. In my
>> opinion, this is a sane constraint, since it preserves the Kafka
>>ordering
>> guarantees (per partition), instead of a simple guarantee per key. This
>> kind of guarantee can be for example useful in a system that compares
>> partition contents to reach eventual consistency using Merkle trees. If
>>the
>> number of partitions is equal, then offsets have the same meaning for
>>the
>> same partition in both clusters, since the data for both partitions is
>> identical before the offset. This allows a simple consumer to just
>>inquire
>> the local broker and the remote broker for their current offsets and, in
>> case the remote broker is ahead, copy the extra data to the local
>>cluster.
>> Since the consumer offsets are no longer bound to the specific
>>partitioning
>> of a single remote cluster, the consumer could pull from one of any
>>number
>> of remote clusters, BitTorrent-style, if their offsets are ahead of the
>> local offset. The group management problem would reduce to assigning
>>local
>> partitions to different MirrorMaker processes, so the group management
>> could be done locally also in this situation.
>>
>> Regards,
>> Vlad
>>


Re: MirrorMaker improvements

Posted by Jun Rao <ju...@confluent.io>.
To amortize the long RTT across data centers, you can tune the TCP window
size by configuring a larger socket.receive.buffer.bytes in the consumer.

For the last one, it seems that you want identical mirroring. The tricky
thing is to figure out how to avoid duplicates when there is a failure. We
had some related discussion in the context of transactional messaging (
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
).

Thanks,

Jun

On Tue, Mar 24, 2015 at 11:44 AM, vlad.gm@gmail.com <vl...@gmail.com>
wrote:

> Dear all,
>
> I had a short discussion with Jay yesterday at the ACM meetup and he
> suggested writing an email regarding a few possible MirrorMaker
> improvements.
>
> At Turn, we have been using MirrorMaker for a a few months now to
> asynchronously replicate our key/value store data between our datacenters.
> In a way, our system is similar to Linkedin's Databus, but it uses Kafka
> clusters and MirrorMaker as its building blocks. Our overall message rate
> peaks at about 650K/sec and, when pushing data over high bandwidth delay
> product links, we have found some minor bottlenecks.
>
> The MirrorMaker process uses a standard consumer to pull data from a remote
> datacenter. This implies that it opens a single TCP connection to each of
> the remote brokers and muxes requests for different topics and partitions
> over this connection. While this is a good thing in terms of maintaining
> the congestion window open, over long RTT lines with rather high loss rate
> the congestion window will cap, in our case at just a few Mbps. While the
> overall line bandwidth is much higher, this means that we have to start
> multiple MirrorMaker processes (somewhere in the hundreds), in order to
> completely use the line capacity. Being able to pool multiple TCP
> connections from a single consumer to a broker would solve this
> complication.
>
> The standard consumer also uses the remote ZooKeeper in order to manage the
> consumer group. While consumer group management is moving closer to the
> brokers, it might make sense to move the group management to the local
> datacenter, since that would avoid using the long-distance connection for
> this purpose.
>
> Another possible improvement assumes a further constraint, namely that the
> number of partitions for a topic in both datacenters is the same. In my
> opinion, this is a sane constraint, since it preserves the Kafka ordering
> guarantees (per partition), instead of a simple guarantee per key. This
> kind of guarantee can be for example useful in a system that compares
> partition contents to reach eventual consistency using Merkle trees. If the
> number of partitions is equal, then offsets have the same meaning for the
> same partition in both clusters, since the data for both partitions is
> identical before the offset. This allows a simple consumer to just inquire
> the local broker and the remote broker for their current offsets and, in
> case the remote broker is ahead, copy the extra data to the local cluster.
> Since the consumer offsets are no longer bound to the specific partitioning
> of a single remote cluster, the consumer could pull from one of any number
> of remote clusters, BitTorrent-style, if their offsets are ahead of the
> local offset. The group management problem would reduce to assigning local
> partitions to different MirrorMaker processes, so the group management
> could be done locally also in this situation.
>
> Regards,
> Vlad
>