You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Sergey Zhemzhitsky <sz...@gmail.com> on 2016/08/01 08:38:49 UTC

Kafka Producer Performance

Hi Camel Gurus,

I've faced with some performance issues of camel-kafka component during
migrating it from 2.17.0 then to 2.17.1 and then to 2.17.2.

The camel route is pretty simple and looks like this

from("file:/var/lib/app/input")
    .split().simple("\n").streaming()
        .to("direct:kafka");
from("direct:kafka")
    .to("kafka:brokerAddr?topic=messages");

The first issue with camel 2.17.0 was the possibility of losing messages
<https://github.com/apache/camel/blob/camel-2.17.0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L101>.
Kafka's native producer is buffering the messages
<https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L468>
and if kafka broker is unavailable then the messages can be lost when the
route is restarted. Although the messages can be lost, the performance was
pretty good (~10K rps) due to kafka's producer buffering.

The second issue with camel 2.17.1 was that the performance of kafka
producer degraded tremendously (up to 100 times) because of blocking on
every message
<https://github.com/apache/camel/blob/camel-2.17.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L100>
(although in that case no message losing occurs).

The third issue with camel 2.17.2 (although camel started using async
callbacks
<https://github.com/apache/camel/blob/camel-2.17.2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L180>)
was that the performance was still pretty poor because kafka's native
producer was not able to buffer more than a single message (because of
synchronous direct endpoint).

The two solutions for the mentioned issues I was able to figure out:

- using seda endpoint instead of direct one (then kafka's native producer
is able to buffer the messages, but there is still a possibility to lose
messages (because of nature of seda))

- using aggregator with direct endpoint (then the route becomes more
complicated than it is expected to be, aggregator adds additional not
necessary delays and why at all we need additional aggregator for batching
if the kafka's native producer already does buffering/batching?)

So the question is - is there any possibility to allow kafka's native
producer buffer more than a single message not using aggregator eip and not
lose the messages as it can happen with intermediate seda endpoint?

Kind Regards,
Sergey

Re: Kafka Producer Performance

Posted by Sergey Zhemzhitsky <sz...@gmail.com>.
Hi Daniel,

Thanks a lot for the suggestion! I'll give it a try.

Now if with batch consumers everything is pretty clear, with event-driven
consumers - it is not.
For example, amqp and jms implementations can prefetch a number of
messages, but cannot batch those ones out of the box.
So to make camel-kafka producer send messages, obtained from event-driven
consumers, in batches it's necessary to prepare batches manually (i.e. by
means of aggregator eip), although downstream components (camel-kafka)
support batching.



On Mon, Aug 1, 2016 at 3:22 PM, Daniel Kulp <dk...@apache.org> wrote:

> For one of my clients, I ended up not using the splitter in Camel and
> instead us a custom processor that would create an Iterator<byte[]>.  This
> will work with updates to camel-kafka that are included in 2.17.3.   For my
> tests, using the camel splitter like you have would get about 5K-10K
> msg/sec. With this, I get about 200K.   However, within camel, it stays as
> a single message so anything in the camel route that needs to look at each
> line wouldn’t really work.
>
>
>
> from(“file://…….")
>     .process(new Processor() {
>         public void process(Exchange exchange) throws Exception {
>              InputStream ins = exchange.getIn().getBody(InputStream.class
> );
>                   exchange.getIn().setBody(new SplitterIterator(ins));
>              }
>         })
>         .to("kafka:brokerAddr?topic=messages"
>              + "&serializerClass=
> org.apache.kafka.common.serialization.ByteArraySerializer"
>              + "&keySerializerClass=
> org.apache.kafka.common.serialization.ByteArraySerializer"
>      );
>
>     class SplitterIterator implements Iterator<byte[]> {
>         final InputStream stream;
>         byte[] next;
>         SplitterIterator(InputStream i) {
>             stream = i;
>             next = readNext();
>         }
>         private byte[] readNext() {
>             ByteArrayOutputStream bout = new ByteArrayOutputStream();
>             try {
>                 int v = stream.read();
>                 while (v != -1 && v != '\n') {
>                     bout.write(v);
>                     v = stream.read();
>                 }
>                 if (bout.size() == 0) {
>                     return null;
>                 }
>                 return bout.toByteArray();
>             } catch (IOException e) {
>                 throw new RuntimeException(e);
>             }
>         }
>
>
>         public boolean hasNext() {
>             return next != null;
>         }
>         public byte[] next() {
>             byte[] tmp = next;
>             next = readNext();
>             return tmp;
>         }
>         @Override
>         public void remove() {
>         }
>     };
>
>
> On Aug 1, 2016, at 4:38 AM, Sergey Zhemzhitsky <sz...@gmail.com> wrote:
>
> Hi Camel Gurus,
>
> I've faced with some performance issues of camel-kafka component during
> migrating it from 2.17.0 then to 2.17.1 and then to 2.17.2.
>
> The camel route is pretty simple and looks like this
>
> from("file:/var/lib/app/input")
>    .split().simple("\n").streaming()
>        .to("direct:kafka");
> from("direct:kafka")
>    .to("kafka:brokerAddr?topic=messages");
>
> The first issue with camel 2.17.0 was the possibility of losing messages
> <
> https://github.com/apache/camel/blob/camel-2.17.0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L101
> >.
> Kafka's native producer is buffering the messages
> <
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L468
> >
> and if kafka broker is unavailable then the messages can be lost when the
> route is restarted. Although the messages can be lost, the performance was
> pretty good (~10K rps) due to kafka's producer buffering.
>
> The second issue with camel 2.17.1 was that the performance of kafka
> producer degraded tremendously (up to 100 times) because of blocking on
> every message
> <
> https://github.com/apache/camel/blob/camel-2.17.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L100
> >
> (although in that case no message losing occurs).
>
> The third issue with camel 2.17.2 (although camel started using async
> callbacks
> <
> https://github.com/apache/camel/blob/camel-2.17.2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L180
> >)
> was that the performance was still pretty poor because kafka's native
> producer was not able to buffer more than a single message (because of
> synchronous direct endpoint).
>
> The two solutions for the mentioned issues I was able to figure out:
>
> - using seda endpoint instead of direct one (then kafka's native producer
> is able to buffer the messages, but there is still a possibility to lose
> messages (because of nature of seda))
>
> - using aggregator with direct endpoint (then the route becomes more
> complicated than it is expected to be, aggregator adds additional not
> necessary delays and why at all we need additional aggregator for batching
> if the kafka's native producer already does buffering/batching?)
>
> So the question is - is there any possibility to allow kafka's native
> producer buffer more than a single message not using aggregator eip and not
> lose the messages as it can happen with intermediate seda endpoint?
>
> Kind Regards,
> Sergey
>
>
> --
> Daniel Kulp
> dkulp@apache.org - http://dankulp.com/blog
> Talend Community Coder - http://coders.talend.com
>
>

Re: Kafka Producer Performance

Posted by Daniel Kulp <dk...@apache.org>.
For one of my clients, I ended up not using the splitter in Camel and instead us a custom processor that would create an Iterator<byte[]>.  This will work with updates to camel-kafka that are included in 2.17.3.   For my tests, using the camel splitter like you have would get about 5K-10K msg/sec. With this, I get about 200K.   However, within camel, it stays as a single message so anything in the camel route that needs to look at each line wouldn’t really work.   



from(“file://…….")
    .process(new Processor() {
        public void process(Exchange exchange) throws Exception {
             InputStream ins = exchange.getIn().getBody(InputStream.class);
                  exchange.getIn().setBody(new SplitterIterator(ins));
             }
        })
        .to("kafka:brokerAddr?topic=messages"
             + "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer"
             + "&keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer"
     ); 

    class SplitterIterator implements Iterator<byte[]> {
        final InputStream stream;
        byte[] next;
        SplitterIterator(InputStream i) {
            stream = i;
            next = readNext();
        }
        private byte[] readNext() {
            ByteArrayOutputStream bout = new ByteArrayOutputStream();
            try {
                int v = stream.read();
                while (v != -1 && v != '\n') {
                    bout.write(v);
                    v = stream.read();
                }
                if (bout.size() == 0) {
                    return null;
                }
                return bout.toByteArray();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        
        public boolean hasNext() {
            return next != null;
        }
        public byte[] next() {
            byte[] tmp = next;
            next = readNext();
            return tmp;
        }
        @Override
        public void remove() {
        }        
    };


> On Aug 1, 2016, at 4:38 AM, Sergey Zhemzhitsky <sz...@gmail.com> wrote:
> 
> Hi Camel Gurus,
> 
> I've faced with some performance issues of camel-kafka component during
> migrating it from 2.17.0 then to 2.17.1 and then to 2.17.2.
> 
> The camel route is pretty simple and looks like this
> 
> from("file:/var/lib/app/input")
>    .split().simple("\n").streaming()
>        .to("direct:kafka");
> from("direct:kafka")
>    .to("kafka:brokerAddr?topic=messages");
> 
> The first issue with camel 2.17.0 was the possibility of losing messages
> <https://github.com/apache/camel/blob/camel-2.17.0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L101>.
> Kafka's native producer is buffering the messages
> <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L468>
> and if kafka broker is unavailable then the messages can be lost when the
> route is restarted. Although the messages can be lost, the performance was
> pretty good (~10K rps) due to kafka's producer buffering.
> 
> The second issue with camel 2.17.1 was that the performance of kafka
> producer degraded tremendously (up to 100 times) because of blocking on
> every message
> <https://github.com/apache/camel/blob/camel-2.17.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L100>
> (although in that case no message losing occurs).
> 
> The third issue with camel 2.17.2 (although camel started using async
> callbacks
> <https://github.com/apache/camel/blob/camel-2.17.2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L180>)
> was that the performance was still pretty poor because kafka's native
> producer was not able to buffer more than a single message (because of
> synchronous direct endpoint).
> 
> The two solutions for the mentioned issues I was able to figure out:
> 
> - using seda endpoint instead of direct one (then kafka's native producer
> is able to buffer the messages, but there is still a possibility to lose
> messages (because of nature of seda))
> 
> - using aggregator with direct endpoint (then the route becomes more
> complicated than it is expected to be, aggregator adds additional not
> necessary delays and why at all we need additional aggregator for batching
> if the kafka's native producer already does buffering/batching?)
> 
> So the question is - is there any possibility to allow kafka's native
> producer buffer more than a single message not using aggregator eip and not
> lose the messages as it can happen with intermediate seda endpoint?
> 
> Kind Regards,
> Sergey

-- 
Daniel Kulp
dkulp@apache.org <ma...@apache.org> - http://dankulp.com/blog <http://dankulp.com/blog>
Talend Community Coder - http://coders.talend.com <http://coders.talend.com/>