You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Colin Kincaid Williams <di...@uw.edu> on 2016/05/02 19:54:41 UTC

Improving performance of a kafka spark streaming app

I've written an application to get content from a kafka topic with 1.7
billion entries,  get the protobuf serialized entries, and insert into
hbase. Currently the environment that I'm running in is Spark 1.2.

With 8 executors and 2 cores, and 2 jobs, I'm only getting between
0-2500 writes / second. This will take much too long to consume the
entries.

I currently believe that the spark kafka receiver is the bottleneck.
I've tried both 1.2 receivers, with the WAL and without, and didn't
notice any large performance difference. I've tried many different
spark configuration options, but can't seem to get better performance.

I saw 80000 requests / second inserting these records into kafka using
yarn / hbase / protobuf / kafka in a bulk fashion.

While hbase inserts might not deliver the same throughput, I'd like to
at least get 10%.

My application looks like
https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877

This is my first spark application. I'd appreciate any assistance.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
I'm attaching a picture from the streaming UI.

On Sat, Jun 18, 2016 at 7:59 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
> There are 25 nodes in the spark cluster.
>
> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
> <mi...@gmail.com> wrote:
>> how many nodes are in your cluster?
>>
>> --num-executors 6 \
>>  --driver-memory 4G \
>>  --executor-memory 2G \
>>  --total-executor-cores 12 \
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>
>>> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>> Kafka using the direct api and inserts content into an hbase cluster,
>>> as described in this thread. I was away from this project for awhile
>>> due to events in my family.
>>>
>>> Currently my scheduling delay is high, but the processing time is
>>> stable around a second. I changed my setup to use 6 kafka partitions
>>> on a set of smaller kafka brokers, with fewer disks. I've included
>>> some details below, including the script I use to launch the
>>> application. I'm using a Spark on Hbase library, whose version is
>>> relevant to my Hbase cluster. Is it apparent there is something wrong
>>> with my launch method that could be causing the delay, related to the
>>> included jars?
>>>
>>> Or is there something wrong with the very simple approach I'm taking
>>> for the application?
>>>
>>> Any advice is appriciated.
>>>
>>>
>>> The application:
>>>
>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>
>>>
>>> From the streaming UI I get something like:
>>>
>>> table Completed Batches (last 1000 out of 27136)
>>>
>>>
>>> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>> Delay (?) Output Ops: Succeeded/Total
>>>
>>> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>
>>> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>
>>> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>
>>>
>>> Here's how I'm launching the spark application.
>>>
>>>
>>> #!/usr/bin/env bash
>>>
>>> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>
>>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>
>>> export
>>> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>
>>>
>>> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>
>>> --class com.example.KafkaToHbase \
>>>
>>> --master spark://spark_master:7077 \
>>>
>>> --deploy-mode client \
>>>
>>> --num-executors 6 \
>>>
>>> --driver-memory 4G \
>>>
>>> --executor-memory 2G \
>>>
>>> --total-executor-cores 12 \
>>>
>>> --jars
>>> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>> \
>>>
>>> --conf spark.app.name="Kafka To Hbase" \
>>>
>>> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>
>>> --conf spark.eventLog.enabled=false \
>>>
>>> --conf spark.eventLog.overwrite=true \
>>>
>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>
>>> --conf spark.streaming.backpressure.enabled=false \
>>>
>>> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>
>>> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>
>>> --driver-java-options
>>>
>>> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>> \
>>>
>>> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>> "broker1:9092,broker2:9092"
>>>
>>> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <di...@uw.edu>
>>> wrote:
>>> > Thanks Cody, I can see that the partitions are well distributed...
>>> > Then I'm in the process of using the direct api.
>>> >
>>> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>> > wrote:
>>> >> 60 partitions in and of itself shouldn't be a big performance issue
>>> >> (as long as producers are distributing across partitions evenly).
>>> >>
>>> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <di...@uw.edu>
>>> >> wrote:
>>> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>> >>> issue with the receiver was the large number of partitions. I had
>>> >>> miscounted the disks and so 11*3*2 is how I decided to partition my
>>> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>> >>> attempt ) . This worked well enough for me, I put 1.7 billion entries
>>> >>> into Kafka on a map reduce job in 5 and a half hours.
>>> >>>
>>> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
>>> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>> >>> yesterday, I tried building against 1.5.2. So far it's running without
>>> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>> >>> improvement using the same code, but I'll see how the direct api
>>> >>> handles it. In the end I can reduce the number of partitions in Kafka
>>> >>> if it causes big performance issues.
>>> >>>
>>> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <co...@koeninger.org>
>>> >>> wrote:
>>> >>>> print() isn't really the best way to benchmark things, since it calls
>>> >>>> take(10) under the covers, but 380 records / second for a single
>>> >>>> receiver doesn't sound right in any case.
>>> >>>>
>>> >>>> Am I understanding correctly that you're trying to process a large
>>> >>>> number of already-existing kafka messages, not keep up with an
>>> >>>> incoming stream?  Can you give any details (e.g. hardware, number of
>>> >>>> topicpartitions, etc)?
>>> >>>>
>>> >>>> Really though, I'd try to start with spark 1.6 and direct streams, or
>>> >>>> even just kafkacat, as a baseline.
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>> >>>> <di...@uw.edu> wrote:
>>> >>>>> Hello again. I searched for "backport kafka" in the list archives
>>> >>>>> but
>>> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going to
>>> >>>>> use accumulators to make a counter, but then saw on the Streaming
>>> >>>>> tab
>>> >>>>> the Receiver Statistics. Then I removed all other "functionality"
>>> >>>>> except:
>>> >>>>>
>>> >>>>>
>>> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>> >>>>> KafkaUtils
>>> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>> >>>>> kafkaParams,
>>> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>> >>>>> kafka.serializer.DefaultDecoder.class,
>>> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>> >>>>>
>>> >>>>>        dstream.print();
>>> >>>>>
>>> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>> >>>>> around
>>> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>> >>>>> above, I'd need to run around 21 receivers, assuming 380 records /
>>> >>>>> second, just using the print output. This seems awfully high to me,
>>> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again using
>>> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>> >>>>> amount of reads.
>>> >>>>>
>>> >>>>> Even given the issues with the 1.2 receivers, is this the expected
>>> >>>>> way
>>> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>> >>>>> wrong?
>>> >>>>>
>>> >>>>> My application looks like
>>> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >>>>>
>>> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org>
>>> >>>>> wrote:
>>> >>>>>> Have you tested for read throughput (without writing to hbase, just
>>> >>>>>> deserialize)?
>>> >>>>>>
>>> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?  The
>>> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>> >>>>>> stuck
>>> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>> >>>>>> search
>>> >>>>>> the mailing list archives.
>>> >>>>>>
>>> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>> >>>>>> <di...@uw.edu> wrote:
>>> >>>>>>> I've written an application to get content from a kafka topic with
>>> >>>>>>> 1.7
>>> >>>>>>> billion entries,  get the protobuf serialized entries, and insert
>>> >>>>>>> into
>>> >>>>>>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>> >>>>>>>
>>> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>>> >>>>>>> 0-2500 writes / second. This will take much too long to consume
>>> >>>>>>> the
>>> >>>>>>> entries.
>>> >>>>>>>
>>> >>>>>>> I currently believe that the spark kafka receiver is the
>>> >>>>>>> bottleneck.
>>> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>> >>>>>>> didn't
>>> >>>>>>> notice any large performance difference. I've tried many different
>>> >>>>>>> spark configuration options, but can't seem to get better
>>> >>>>>>> performance.
>>> >>>>>>>
>>> >>>>>>> I saw 80000 requests / second inserting these records into kafka
>>> >>>>>>> using
>>> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>> >>>>>>>
>>> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>> >>>>>>> like to
>>> >>>>>>> at least get 10%.
>>> >>>>>>>
>>> >>>>>>> My application looks like
>>> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >>>>>>>
>>> >>>>>>> This is my first spark application. I'd appreciate any assistance.
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> ---------------------------------------------------------------------
>>> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>> >>>>>>>
>>> >>>>
>>> >>>> ---------------------------------------------------------------------
>>> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>> >>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>

Re: Improving performance of a kafka spark streaming app

Posted by Cody Koeninger <co...@koeninger.org>.
Unless I'm misreading the image you posted, it does show event counts
for the single batch that is still running, with 1.7 billion events in
it.  The recent batches do show 0 events, but I'm guessing that's
because they're actually empty.

When you said you had a kafka topic with 1.7 billion events in it, did
you mean it just statically contains that many events, and no new
events are coming in currently?  If that's the case, you may be better
off just generating RDDs of an appropriate range of offsets, one after
the other, rather than using streaming.

I'm also still not clear if you have tried benchmarking a job that
simply reads from your topic, without inserting into hbase.

On Thu, Jun 23, 2016 at 12:09 AM, Colin Kincaid Williams <di...@uw.edu> wrote:
> Streaming UI tab showing empty events and very different metrics than on 1.5.2
>
> On Thu, Jun 23, 2016 at 5:06 AM, Colin Kincaid Williams <di...@uw.edu> wrote:
>> After a bit of effort I moved from a Spark cluster running 1.5.2, to a
>> Yarn cluster running 1.6.1 jars. I'm still setting the maxRPP. The
>> completed batches are no longer showing the number of events processed
>> in the Streaming UI tab . I'm getting around 4k inserts per second in
>> hbase, but I haven't yet tried to remove or reset the mRPP.  I will
>> attach a screenshot of the UI tab. It shows significantly lower
>> figures for processing and delay times, than the previous posted shot.
>> It also shows the batches as empty, however I see the requests hitting
>> hbase.
>>
>> Then it's possible my issues were related to running on the Spark
>> 1.5.2 cluster. Also is the missing event count in the completed
>> batches a bug? Should I file an issue?
>>
>> On Tue, Jun 21, 2016 at 9:04 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>> Thanks @Cody, I will try that out. In the interm, I tried to validate
>>> my Hbase cluster by running a random write test and see 30-40K writes
>>> per second. This suggests there is noticeable room for improvement.
>>>
>>> On Tue, Jun 21, 2016 at 8:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>> Take HBase out of the equation and just measure what your read
>>>> performance is by doing something like
>>>>
>>>> createDirectStream(...).foreach(_.println)
>>>>
>>>> not take() or print()
>>>>
>>>> On Tue, Jun 21, 2016 at 3:19 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>> @Cody I was able to bring my processing time down to a second by
>>>>> setting maxRatePerPartition as discussed. My bad that I didn't
>>>>> recognize it as the cause of my scheduling delay.
>>>>>
>>>>> Since then I've tried experimenting with a larger Spark Context
>>>>> duration. I've been trying to get some noticeable improvement
>>>>> inserting messages from Kafka -> Hbase using the above application.
>>>>> I'm currently getting around 3500 inserts / second on a 9 node hbase
>>>>> cluster. So far, I haven't been able to get much more throughput. Then
>>>>> I'm looking for advice here how I should tune Kafka and Spark for this
>>>>> job.
>>>>>
>>>>> I can create a kafka topic with as many partitions that I want. I can
>>>>> set the Duration and maxRatePerPartition. I have 1.7 billion messages
>>>>> that I can insert rather quickly into the Kafka queue, and I'd like to
>>>>> get them into Hbase as quickly as possible.
>>>>>
>>>>> I'm looking for advice regarding # Kafka Topic Partitions / Streaming
>>>>> Duration / maxRatePerPartition / any other spark settings or code
>>>>> changes that I should make to try to get a better consumption rate.
>>>>>
>>>>> Thanks for all the help so far, this is the first Spark application I
>>>>> have written.
>>>>>
>>>>> On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>> I'll try dropping the maxRatePerPartition=400, or maybe even lower.
>>>>>> However even at application starts up I have this large scheduling
>>>>>> delay. I will report my progress later on.
>>>>>>
>>>>>> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>>>>> If your batch time is 1 second and your average processing time is
>>>>>>> 1.16 seconds, you're always going to be falling behind.  That would
>>>>>>> explain why you've built up an hour of scheduling delay after eight
>>>>>>> hours of running.
>>>>>>>
>>>>>>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>>>> Hi Mich again,
>>>>>>>>
>>>>>>>> Regarding batch window, etc. I have provided the sources, but I'm not
>>>>>>>> currently calling the window function. Did you see the program source?
>>>>>>>> It's only 100 lines.
>>>>>>>>
>>>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>
>>>>>>>> Then I would expect I'm using defaults, other than what has been shown
>>>>>>>> in the configuration.
>>>>>>>>
>>>>>>>> For example:
>>>>>>>>
>>>>>>>> In the launcher configuration I set --conf
>>>>>>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>>>>>>>> are 500 messages for the duration set in the application:
>>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>>>>>>>> Duration(1000));
>>>>>>>>
>>>>>>>>
>>>>>>>> Then with the --num-executors 6 \ submit flag, and the
>>>>>>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>>>>>>>> arrive at the 3000 events per batch in the UI, pasted above.
>>>>>>>>
>>>>>>>> Feel free to correct me if I'm wrong.
>>>>>>>>
>>>>>>>> Then are you suggesting that I set the window?
>>>>>>>>
>>>>>>>> Maybe following this as reference:
>>>>>>>>
>>>>>>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>>>>>>>
>>>>>>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>>>>>>> <mi...@gmail.com> wrote:
>>>>>>>>> Ok
>>>>>>>>>
>>>>>>>>> What is the set up for these please?
>>>>>>>>>
>>>>>>>>> batch window
>>>>>>>>> window length
>>>>>>>>> sliding interval
>>>>>>>>>
>>>>>>>>> And also in each batch window how much data do you get in (no of messages in
>>>>>>>>> the topic whatever)?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> I believe you have an issue with performance?
>>>>>>>>>>
>>>>>>>>>> have you checked spark GUI (default 4040) for details including shuffles
>>>>>>>>>> etc?
>>>>>>>>>>
>>>>>>>>>> HTH
>>>>>>>>>>
>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> LinkedIn
>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>>>>>>>
>>>>>>>>>>> There are 25 nodes in the spark cluster.
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>>>>>>>> <mi...@gmail.com> wrote:
>>>>>>>>>>> > how many nodes are in your cluster?
>>>>>>>>>>> >
>>>>>>>>>>> > --num-executors 6 \
>>>>>>>>>>> >  --driver-memory 4G \
>>>>>>>>>>> >  --executor-memory 2G \
>>>>>>>>>>> >  --total-executor-cores 12 \
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > Dr Mich Talebzadeh
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > LinkedIn
>>>>>>>>>>> >
>>>>>>>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > http://talebzadehmich.wordpress.com
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>>>>>>>>>>> > wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>>>>>>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>>>>>>>>> >> as described in this thread. I was away from this project for awhile
>>>>>>>>>>> >> due to events in my family.
>>>>>>>>>>> >>
>>>>>>>>>>> >> Currently my scheduling delay is high, but the processing time is
>>>>>>>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>>>>>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>>>>>>>>> >> some details below, including the script I use to launch the
>>>>>>>>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>>>>>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>>>>>>>>> >> with my launch method that could be causing the delay, related to the
>>>>>>>>>>> >> included jars?
>>>>>>>>>>> >>
>>>>>>>>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>>>>>>>>> >> for the application?
>>>>>>>>>>> >>
>>>>>>>>>>> >> Any advice is appriciated.
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> The application:
>>>>>>>>>>> >>
>>>>>>>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> From the streaming UI I get something like:
>>>>>>>>>>> >>
>>>>>>>>>>> >> table Completed Batches (last 1000 out of 27136)
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>>>>>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>>>>>>>> >>
>>>>>>>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>>>>> >>
>>>>>>>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>>>>> >>
>>>>>>>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> Here's how I'm launching the spark application.
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> #!/usr/bin/env bash
>>>>>>>>>>> >>
>>>>>>>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>>>>>>>> >>
>>>>>>>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>>>>>>>> >>
>>>>>>>>>>> >> export
>>>>>>>>>>> >>
>>>>>>>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --class com.example.KafkaToHbase \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --master spark://spark_master:7077 \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --deploy-mode client \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --num-executors 6 \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --driver-memory 4G \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --executor-memory 2G \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --total-executor-cores 12 \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --jars
>>>>>>>>>>> >>
>>>>>>>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>>>>>>>> >> \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --conf spark.eventLog.enabled=false \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>>>>>>>>> >>
>>>>>>>>>>> >> --driver-java-options
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>>>>>>>> >> \
>>>>>>>>>>> >>
>>>>>>>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>>>>>>>>> >> "broker1:9092,broker2:9092"
>>>>>>>>>>> >>
>>>>>>>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>>>>>>>>> >> <di...@uw.edu>
>>>>>>>>>>> >> wrote:
>>>>>>>>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>>>>>>>>> >> > Then I'm in the process of using the direct api.
>>>>>>>>>>> >> >
>>>>>>>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>>>>>>>>>> >> > wrote:
>>>>>>>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>>>>>>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>>>>>>>>> >> >>
>>>>>>>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>>>>>>>>> >> >> <di...@uw.edu>
>>>>>>>>>>> >> >> wrote:
>>>>>>>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>>>>>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>>>>>>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>>>>>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>>>>>>>>> >> >>> my
>>>>>>>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>>>>>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>>>>>>>>> >> >>> entries
>>>>>>>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>>>>>>>>> >> >>>
>>>>>>>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>>>>>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>>>>>>>>> >> >>> jars
>>>>>>>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>>>>>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>>>>>>>>> >> >>> without
>>>>>>>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>>>>>>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>>>>>>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>>>>>>>>> >> >>> Kafka
>>>>>>>>>>> >> >>> if it causes big performance issues.
>>>>>>>>>>> >> >>>
>>>>>>>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>>>>>>>>> >> >>> <co...@koeninger.org>
>>>>>>>>>>> >> >>> wrote:
>>>>>>>>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>>>>>>>>> >> >>>> calls
>>>>>>>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>>>>>>>>> >> >>>> receiver doesn't sound right in any case.
>>>>>>>>>>> >> >>>>
>>>>>>>>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>>>>>>>>> >> >>>> large
>>>>>>>>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>>>>>>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>>>>>>>>> >> >>>> of
>>>>>>>>>>> >> >>>> topicpartitions, etc)?
>>>>>>>>>>> >> >>>>
>>>>>>>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>>>>>>>>> >> >>>> streams, or
>>>>>>>>>>> >> >>>> even just kafkacat, as a baseline.
>>>>>>>>>>> >> >>>>
>>>>>>>>>>> >> >>>>
>>>>>>>>>>> >> >>>>
>>>>>>>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>>>>>>>>> >> >>>> <di...@uw.edu> wrote:
>>>>>>>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>>>>>>>>> >> >>>>> archives
>>>>>>>>>>> >> >>>>> but
>>>>>>>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>>>>>>>>> >> >>>>> to
>>>>>>>>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>>>>>>>>> >> >>>>> Streaming
>>>>>>>>>>> >> >>>>> tab
>>>>>>>>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>>>>>>>>> >> >>>>> "functionality"
>>>>>>>>>>> >> >>>>> except:
>>>>>>>>>>> >> >>>>>
>>>>>>>>>>> >> >>>>>
>>>>>>>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>>>>>>>>> >> >>>>> KafkaUtils
>>>>>>>>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>>>>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>>>>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>>>>>>>>> >> >>>>> kafkaParams,
>>>>>>>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>>>>>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>>>>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>>>>>>> >> >>>>>
>>>>>>>>>>> >> >>>>>        dstream.print();
>>>>>>>>>>> >> >>>>>
>>>>>>>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>>>>>>>>> >> >>>>> around
>>>>>>>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>>>>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>>>>>>>>> >> >>>>> /
>>>>>>>>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>>>>>>>>> >> >>>>> me,
>>>>>>>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>>>>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>>>>>>>>> >> >>>>> using
>>>>>>>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>>>>>>>> >> >>>>> amount of reads.
>>>>>>>>>>> >> >>>>>
>>>>>>>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>>>>>>>>> >> >>>>> expected
>>>>>>>>>>> >> >>>>> way
>>>>>>>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>>>>>>>> >> >>>>> wrong?
>>>>>>>>>>> >> >>>>>
>>>>>>>>>>> >> >>>>> My application looks like
>>>>>>>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>>>> >> >>>>>
>>>>>>>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>>>>>>>>> >> >>>>> <co...@koeninger.org>
>>>>>>>>>>> >> >>>>> wrote:
>>>>>>>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>>>>>>>>> >> >>>>>> just
>>>>>>>>>>> >> >>>>>> deserialize)?
>>>>>>>>>>> >> >>>>>>
>>>>>>>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>>>>>>>>> >> >>>>>> The
>>>>>>>>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>>>>>>>>> >> >>>>>> stuck
>>>>>>>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>>>>>>>>> >> >>>>>> search
>>>>>>>>>>> >> >>>>>> the mailing list archives.
>>>>>>>>>>> >> >>>>>>
>>>>>>>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>>>>>>>>> >> >>>>>> <di...@uw.edu> wrote:
>>>>>>>>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>>>>>>>>> >> >>>>>>> with
>>>>>>>>>>> >> >>>>>>> 1.7
>>>>>>>>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>>>>>>>>> >> >>>>>>> insert
>>>>>>>>>>> >> >>>>>>> into
>>>>>>>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>>>>>>>>> >> >>>>>>> 1.2.
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>>>>>>>>> >> >>>>>>> between
>>>>>>>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>>>>>>>>> >> >>>>>>> consume
>>>>>>>>>>> >> >>>>>>> the
>>>>>>>>>>> >> >>>>>>> entries.
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>>>>>>>>> >> >>>>>>> bottleneck.
>>>>>>>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>>>>>>>>> >> >>>>>>> didn't
>>>>>>>>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>>>>>>>>> >> >>>>>>> different
>>>>>>>>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>>>>>>>>> >> >>>>>>> performance.
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>>>>>>>>> >> >>>>>>> kafka
>>>>>>>>>>> >> >>>>>>> using
>>>>>>>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>>>>>>>>> >> >>>>>>> like to
>>>>>>>>>>> >> >>>>>>> at least get 10%.
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>> My application looks like
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>>>>>>>>> >> >>>>>>> assistance.
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>> >> >>>>>>>
>>>>>>>>>>> >> >>>>
>>>>>>>>>>> >> >>>>
>>>>>>>>>>> >> >>>> ---------------------------------------------------------------------
>>>>>>>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>> >> >>>>
>>>>>>>>>>> >>
>>>>>>>>>>> >> ---------------------------------------------------------------------
>>>>>>>>>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>> >>
>>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
Streaming UI tab showing empty events and very different metrics than on 1.5.2

On Thu, Jun 23, 2016 at 5:06 AM, Colin Kincaid Williams <di...@uw.edu> wrote:
> After a bit of effort I moved from a Spark cluster running 1.5.2, to a
> Yarn cluster running 1.6.1 jars. I'm still setting the maxRPP. The
> completed batches are no longer showing the number of events processed
> in the Streaming UI tab . I'm getting around 4k inserts per second in
> hbase, but I haven't yet tried to remove or reset the mRPP.  I will
> attach a screenshot of the UI tab. It shows significantly lower
> figures for processing and delay times, than the previous posted shot.
> It also shows the batches as empty, however I see the requests hitting
> hbase.
>
> Then it's possible my issues were related to running on the Spark
> 1.5.2 cluster. Also is the missing event count in the completed
> batches a bug? Should I file an issue?
>
> On Tue, Jun 21, 2016 at 9:04 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>> Thanks @Cody, I will try that out. In the interm, I tried to validate
>> my Hbase cluster by running a random write test and see 30-40K writes
>> per second. This suggests there is noticeable room for improvement.
>>
>> On Tue, Jun 21, 2016 at 8:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>> Take HBase out of the equation and just measure what your read
>>> performance is by doing something like
>>>
>>> createDirectStream(...).foreach(_.println)
>>>
>>> not take() or print()
>>>
>>> On Tue, Jun 21, 2016 at 3:19 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>> @Cody I was able to bring my processing time down to a second by
>>>> setting maxRatePerPartition as discussed. My bad that I didn't
>>>> recognize it as the cause of my scheduling delay.
>>>>
>>>> Since then I've tried experimenting with a larger Spark Context
>>>> duration. I've been trying to get some noticeable improvement
>>>> inserting messages from Kafka -> Hbase using the above application.
>>>> I'm currently getting around 3500 inserts / second on a 9 node hbase
>>>> cluster. So far, I haven't been able to get much more throughput. Then
>>>> I'm looking for advice here how I should tune Kafka and Spark for this
>>>> job.
>>>>
>>>> I can create a kafka topic with as many partitions that I want. I can
>>>> set the Duration and maxRatePerPartition. I have 1.7 billion messages
>>>> that I can insert rather quickly into the Kafka queue, and I'd like to
>>>> get them into Hbase as quickly as possible.
>>>>
>>>> I'm looking for advice regarding # Kafka Topic Partitions / Streaming
>>>> Duration / maxRatePerPartition / any other spark settings or code
>>>> changes that I should make to try to get a better consumption rate.
>>>>
>>>> Thanks for all the help so far, this is the first Spark application I
>>>> have written.
>>>>
>>>> On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>> I'll try dropping the maxRatePerPartition=400, or maybe even lower.
>>>>> However even at application starts up I have this large scheduling
>>>>> delay. I will report my progress later on.
>>>>>
>>>>> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>>>> If your batch time is 1 second and your average processing time is
>>>>>> 1.16 seconds, you're always going to be falling behind.  That would
>>>>>> explain why you've built up an hour of scheduling delay after eight
>>>>>> hours of running.
>>>>>>
>>>>>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>>> Hi Mich again,
>>>>>>>
>>>>>>> Regarding batch window, etc. I have provided the sources, but I'm not
>>>>>>> currently calling the window function. Did you see the program source?
>>>>>>> It's only 100 lines.
>>>>>>>
>>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>
>>>>>>> Then I would expect I'm using defaults, other than what has been shown
>>>>>>> in the configuration.
>>>>>>>
>>>>>>> For example:
>>>>>>>
>>>>>>> In the launcher configuration I set --conf
>>>>>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>>>>>>> are 500 messages for the duration set in the application:
>>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>>>>>>> Duration(1000));
>>>>>>>
>>>>>>>
>>>>>>> Then with the --num-executors 6 \ submit flag, and the
>>>>>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>>>>>>> arrive at the 3000 events per batch in the UI, pasted above.
>>>>>>>
>>>>>>> Feel free to correct me if I'm wrong.
>>>>>>>
>>>>>>> Then are you suggesting that I set the window?
>>>>>>>
>>>>>>> Maybe following this as reference:
>>>>>>>
>>>>>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>>>>>>
>>>>>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>>>>>> <mi...@gmail.com> wrote:
>>>>>>>> Ok
>>>>>>>>
>>>>>>>> What is the set up for these please?
>>>>>>>>
>>>>>>>> batch window
>>>>>>>> window length
>>>>>>>> sliding interval
>>>>>>>>
>>>>>>>> And also in each batch window how much data do you get in (no of messages in
>>>>>>>> the topic whatever)?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> I believe you have an issue with performance?
>>>>>>>>>
>>>>>>>>> have you checked spark GUI (default 4040) for details including shuffles
>>>>>>>>> etc?
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>>>>>>
>>>>>>>>>> There are 25 nodes in the spark cluster.
>>>>>>>>>>
>>>>>>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>>>>>>> <mi...@gmail.com> wrote:
>>>>>>>>>> > how many nodes are in your cluster?
>>>>>>>>>> >
>>>>>>>>>> > --num-executors 6 \
>>>>>>>>>> >  --driver-memory 4G \
>>>>>>>>>> >  --executor-memory 2G \
>>>>>>>>>> >  --total-executor-cores 12 \
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Dr Mich Talebzadeh
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > LinkedIn
>>>>>>>>>> >
>>>>>>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > http://talebzadehmich.wordpress.com
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>>>>>>>>>> > wrote:
>>>>>>>>>> >>
>>>>>>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>>>>>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>>>>>>>> >> as described in this thread. I was away from this project for awhile
>>>>>>>>>> >> due to events in my family.
>>>>>>>>>> >>
>>>>>>>>>> >> Currently my scheduling delay is high, but the processing time is
>>>>>>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>>>>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>>>>>>>> >> some details below, including the script I use to launch the
>>>>>>>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>>>>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>>>>>>>> >> with my launch method that could be causing the delay, related to the
>>>>>>>>>> >> included jars?
>>>>>>>>>> >>
>>>>>>>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>>>>>>>> >> for the application?
>>>>>>>>>> >>
>>>>>>>>>> >> Any advice is appriciated.
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> The application:
>>>>>>>>>> >>
>>>>>>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> From the streaming UI I get something like:
>>>>>>>>>> >>
>>>>>>>>>> >> table Completed Batches (last 1000 out of 27136)
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>>>>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>>>>>>> >>
>>>>>>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>>>> >>
>>>>>>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>>>> >>
>>>>>>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> Here's how I'm launching the spark application.
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> #!/usr/bin/env bash
>>>>>>>>>> >>
>>>>>>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>>>>>>> >>
>>>>>>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>>>>>>> >>
>>>>>>>>>> >> export
>>>>>>>>>> >>
>>>>>>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>>>>>>>> >>
>>>>>>>>>> >> --class com.example.KafkaToHbase \
>>>>>>>>>> >>
>>>>>>>>>> >> --master spark://spark_master:7077 \
>>>>>>>>>> >>
>>>>>>>>>> >> --deploy-mode client \
>>>>>>>>>> >>
>>>>>>>>>> >> --num-executors 6 \
>>>>>>>>>> >>
>>>>>>>>>> >> --driver-memory 4G \
>>>>>>>>>> >>
>>>>>>>>>> >> --executor-memory 2G \
>>>>>>>>>> >>
>>>>>>>>>> >> --total-executor-cores 12 \
>>>>>>>>>> >>
>>>>>>>>>> >> --jars
>>>>>>>>>> >>
>>>>>>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>>>>>>> >> \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.eventLog.enabled=false \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>>>>>>>> >>
>>>>>>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>>>>>>>> >>
>>>>>>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>>>>>>>> >>
>>>>>>>>>> >> --driver-java-options
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>>>>>>> >> \
>>>>>>>>>> >>
>>>>>>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>>>>>>>> >> "broker1:9092,broker2:9092"
>>>>>>>>>> >>
>>>>>>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>>>>>>>> >> <di...@uw.edu>
>>>>>>>>>> >> wrote:
>>>>>>>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>>>>>>>> >> > Then I'm in the process of using the direct api.
>>>>>>>>>> >> >
>>>>>>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>>>>>>>>> >> > wrote:
>>>>>>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>>>>>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>>>>>>>> >> >>
>>>>>>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>>>>>>>> >> >> <di...@uw.edu>
>>>>>>>>>> >> >> wrote:
>>>>>>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>>>>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>>>>>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>>>>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>>>>>>>> >> >>> my
>>>>>>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>>>>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>>>>>>>> >> >>> entries
>>>>>>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>>>>>>>> >> >>>
>>>>>>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>>>>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>>>>>>>> >> >>> jars
>>>>>>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>>>>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>>>>>>>> >> >>> without
>>>>>>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>>>>>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>>>>>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>>>>>>>> >> >>> Kafka
>>>>>>>>>> >> >>> if it causes big performance issues.
>>>>>>>>>> >> >>>
>>>>>>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>>>>>>>> >> >>> <co...@koeninger.org>
>>>>>>>>>> >> >>> wrote:
>>>>>>>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>>>>>>>> >> >>>> calls
>>>>>>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>>>>>>>> >> >>>> receiver doesn't sound right in any case.
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>>>>>>>> >> >>>> large
>>>>>>>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>>>>>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>>>>>>>> >> >>>> of
>>>>>>>>>> >> >>>> topicpartitions, etc)?
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>>>>>>>> >> >>>> streams, or
>>>>>>>>>> >> >>>> even just kafkacat, as a baseline.
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>>>>>>>> >> >>>> <di...@uw.edu> wrote:
>>>>>>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>>>>>>>> >> >>>>> archives
>>>>>>>>>> >> >>>>> but
>>>>>>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>>>>>>>> >> >>>>> to
>>>>>>>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>>>>>>>> >> >>>>> Streaming
>>>>>>>>>> >> >>>>> tab
>>>>>>>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>>>>>>>> >> >>>>> "functionality"
>>>>>>>>>> >> >>>>> except:
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>>>>>>>> >> >>>>> KafkaUtils
>>>>>>>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>>>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>>>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>>>>>>>> >> >>>>> kafkaParams,
>>>>>>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>>>>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>>>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>>        dstream.print();
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>>>>>>>> >> >>>>> around
>>>>>>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>>>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>>>>>>>> >> >>>>> /
>>>>>>>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>>>>>>>> >> >>>>> me,
>>>>>>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>>>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>>>>>>>> >> >>>>> using
>>>>>>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>>>>>>> >> >>>>> amount of reads.
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>>>>>>>> >> >>>>> expected
>>>>>>>>>> >> >>>>> way
>>>>>>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>>>>>>> >> >>>>> wrong?
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>> My application looks like
>>>>>>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>>> >> >>>>>
>>>>>>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>>>>>>>> >> >>>>> <co...@koeninger.org>
>>>>>>>>>> >> >>>>> wrote:
>>>>>>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>>>>>>>> >> >>>>>> just
>>>>>>>>>> >> >>>>>> deserialize)?
>>>>>>>>>> >> >>>>>>
>>>>>>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>>>>>>>> >> >>>>>> The
>>>>>>>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>>>>>>>> >> >>>>>> stuck
>>>>>>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>>>>>>>> >> >>>>>> search
>>>>>>>>>> >> >>>>>> the mailing list archives.
>>>>>>>>>> >> >>>>>>
>>>>>>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>>>>>>>> >> >>>>>> <di...@uw.edu> wrote:
>>>>>>>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>>>>>>>> >> >>>>>>> with
>>>>>>>>>> >> >>>>>>> 1.7
>>>>>>>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>>>>>>>> >> >>>>>>> insert
>>>>>>>>>> >> >>>>>>> into
>>>>>>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>>>>>>>> >> >>>>>>> 1.2.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>>>>>>>> >> >>>>>>> between
>>>>>>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>>>>>>>> >> >>>>>>> consume
>>>>>>>>>> >> >>>>>>> the
>>>>>>>>>> >> >>>>>>> entries.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>>>>>>>> >> >>>>>>> bottleneck.
>>>>>>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>>>>>>>> >> >>>>>>> didn't
>>>>>>>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>>>>>>>> >> >>>>>>> different
>>>>>>>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>>>>>>>> >> >>>>>>> performance.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>>>>>>>> >> >>>>>>> kafka
>>>>>>>>>> >> >>>>>>> using
>>>>>>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>>>>>>>> >> >>>>>>> like to
>>>>>>>>>> >> >>>>>>> at least get 10%.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> My application looks like
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>>>>>>>> >> >>>>>>> assistance.
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>> >> >>>>>>>
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>>
>>>>>>>>>> >> >>>> ---------------------------------------------------------------------
>>>>>>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>> >> >>>>
>>>>>>>>>> >>
>>>>>>>>>> >> ---------------------------------------------------------------------
>>>>>>>>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>> >>
>>>>>>>>>> >
>>>>>>>>>
>>>>>>>>>
>>>>>>>>

Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
After a bit of effort I moved from a Spark cluster running 1.5.2, to a
Yarn cluster running 1.6.1 jars. I'm still setting the maxRPP. The
completed batches are no longer showing the number of events processed
in the Streaming UI tab . I'm getting around 4k inserts per second in
hbase, but I haven't yet tried to remove or reset the mRPP.  I will
attach a screenshot of the UI tab. It shows significantly lower
figures for processing and delay times, than the previous posted shot.
It also shows the batches as empty, however I see the requests hitting
hbase.

Then it's possible my issues were related to running on the Spark
1.5.2 cluster. Also is the missing event count in the completed
batches a bug? Should I file an issue?

On Tue, Jun 21, 2016 at 9:04 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
> Thanks @Cody, I will try that out. In the interm, I tried to validate
> my Hbase cluster by running a random write test and see 30-40K writes
> per second. This suggests there is noticeable room for improvement.
>
> On Tue, Jun 21, 2016 at 8:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
>> Take HBase out of the equation and just measure what your read
>> performance is by doing something like
>>
>> createDirectStream(...).foreach(_.println)
>>
>> not take() or print()
>>
>> On Tue, Jun 21, 2016 at 3:19 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>> @Cody I was able to bring my processing time down to a second by
>>> setting maxRatePerPartition as discussed. My bad that I didn't
>>> recognize it as the cause of my scheduling delay.
>>>
>>> Since then I've tried experimenting with a larger Spark Context
>>> duration. I've been trying to get some noticeable improvement
>>> inserting messages from Kafka -> Hbase using the above application.
>>> I'm currently getting around 3500 inserts / second on a 9 node hbase
>>> cluster. So far, I haven't been able to get much more throughput. Then
>>> I'm looking for advice here how I should tune Kafka and Spark for this
>>> job.
>>>
>>> I can create a kafka topic with as many partitions that I want. I can
>>> set the Duration and maxRatePerPartition. I have 1.7 billion messages
>>> that I can insert rather quickly into the Kafka queue, and I'd like to
>>> get them into Hbase as quickly as possible.
>>>
>>> I'm looking for advice regarding # Kafka Topic Partitions / Streaming
>>> Duration / maxRatePerPartition / any other spark settings or code
>>> changes that I should make to try to get a better consumption rate.
>>>
>>> Thanks for all the help so far, this is the first Spark application I
>>> have written.
>>>
>>> On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>> I'll try dropping the maxRatePerPartition=400, or maybe even lower.
>>>> However even at application starts up I have this large scheduling
>>>> delay. I will report my progress later on.
>>>>
>>>> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>>> If your batch time is 1 second and your average processing time is
>>>>> 1.16 seconds, you're always going to be falling behind.  That would
>>>>> explain why you've built up an hour of scheduling delay after eight
>>>>> hours of running.
>>>>>
>>>>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>> Hi Mich again,
>>>>>>
>>>>>> Regarding batch window, etc. I have provided the sources, but I'm not
>>>>>> currently calling the window function. Did you see the program source?
>>>>>> It's only 100 lines.
>>>>>>
>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>
>>>>>> Then I would expect I'm using defaults, other than what has been shown
>>>>>> in the configuration.
>>>>>>
>>>>>> For example:
>>>>>>
>>>>>> In the launcher configuration I set --conf
>>>>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>>>>>> are 500 messages for the duration set in the application:
>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>>>>>> Duration(1000));
>>>>>>
>>>>>>
>>>>>> Then with the --num-executors 6 \ submit flag, and the
>>>>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>>>>>> arrive at the 3000 events per batch in the UI, pasted above.
>>>>>>
>>>>>> Feel free to correct me if I'm wrong.
>>>>>>
>>>>>> Then are you suggesting that I set the window?
>>>>>>
>>>>>> Maybe following this as reference:
>>>>>>
>>>>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>>>>>
>>>>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>>>>> <mi...@gmail.com> wrote:
>>>>>>> Ok
>>>>>>>
>>>>>>> What is the set up for these please?
>>>>>>>
>>>>>>> batch window
>>>>>>> window length
>>>>>>> sliding interval
>>>>>>>
>>>>>>> And also in each batch window how much data do you get in (no of messages in
>>>>>>> the topic whatever)?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> I believe you have an issue with performance?
>>>>>>>>
>>>>>>>> have you checked spark GUI (default 4040) for details including shuffles
>>>>>>>> etc?
>>>>>>>>
>>>>>>>> HTH
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>>>>>
>>>>>>>>> There are 25 nodes in the spark cluster.
>>>>>>>>>
>>>>>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>>>>>> <mi...@gmail.com> wrote:
>>>>>>>>> > how many nodes are in your cluster?
>>>>>>>>> >
>>>>>>>>> > --num-executors 6 \
>>>>>>>>> >  --driver-memory 4G \
>>>>>>>>> >  --executor-memory 2G \
>>>>>>>>> >  --total-executor-cores 12 \
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > Dr Mich Talebzadeh
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > LinkedIn
>>>>>>>>> >
>>>>>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > http://talebzadehmich.wordpress.com
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>>>>>>>>> > wrote:
>>>>>>>>> >>
>>>>>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>>>>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>>>>>>> >> as described in this thread. I was away from this project for awhile
>>>>>>>>> >> due to events in my family.
>>>>>>>>> >>
>>>>>>>>> >> Currently my scheduling delay is high, but the processing time is
>>>>>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>>>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>>>>>>> >> some details below, including the script I use to launch the
>>>>>>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>>>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>>>>>>> >> with my launch method that could be causing the delay, related to the
>>>>>>>>> >> included jars?
>>>>>>>>> >>
>>>>>>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>>>>>>> >> for the application?
>>>>>>>>> >>
>>>>>>>>> >> Any advice is appriciated.
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> The application:
>>>>>>>>> >>
>>>>>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> From the streaming UI I get something like:
>>>>>>>>> >>
>>>>>>>>> >> table Completed Batches (last 1000 out of 27136)
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>>>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>>>>>> >>
>>>>>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>>> >>
>>>>>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>>> >>
>>>>>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> Here's how I'm launching the spark application.
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> #!/usr/bin/env bash
>>>>>>>>> >>
>>>>>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>>>>>> >>
>>>>>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>>>>>> >>
>>>>>>>>> >> export
>>>>>>>>> >>
>>>>>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>>>>>>> >>
>>>>>>>>> >> --class com.example.KafkaToHbase \
>>>>>>>>> >>
>>>>>>>>> >> --master spark://spark_master:7077 \
>>>>>>>>> >>
>>>>>>>>> >> --deploy-mode client \
>>>>>>>>> >>
>>>>>>>>> >> --num-executors 6 \
>>>>>>>>> >>
>>>>>>>>> >> --driver-memory 4G \
>>>>>>>>> >>
>>>>>>>>> >> --executor-memory 2G \
>>>>>>>>> >>
>>>>>>>>> >> --total-executor-cores 12 \
>>>>>>>>> >>
>>>>>>>>> >> --jars
>>>>>>>>> >>
>>>>>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>>>>>> >> \
>>>>>>>>> >>
>>>>>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>>>>>> >>
>>>>>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>>>>>>> >>
>>>>>>>>> >> --conf spark.eventLog.enabled=false \
>>>>>>>>> >>
>>>>>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>>>>>> >>
>>>>>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>>>>>>> >>
>>>>>>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>>>>>>> >>
>>>>>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>>>>>>> >>
>>>>>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>>>>>>> >>
>>>>>>>>> >> --driver-java-options
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>>>>>> >> \
>>>>>>>>> >>
>>>>>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>>>>>>> >> "broker1:9092,broker2:9092"
>>>>>>>>> >>
>>>>>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>>>>>>> >> <di...@uw.edu>
>>>>>>>>> >> wrote:
>>>>>>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>>>>>>> >> > Then I'm in the process of using the direct api.
>>>>>>>>> >> >
>>>>>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>>>>>>>> >> > wrote:
>>>>>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>>>>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>>>>>>> >> >>
>>>>>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>>>>>>> >> >> <di...@uw.edu>
>>>>>>>>> >> >> wrote:
>>>>>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>>>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>>>>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>>>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>>>>>>> >> >>> my
>>>>>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>>>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>>>>>>> >> >>> entries
>>>>>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>>>>>>> >> >>>
>>>>>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>>>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>>>>>>> >> >>> jars
>>>>>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>>>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>>>>>>> >> >>> without
>>>>>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>>>>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>>>>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>>>>>>> >> >>> Kafka
>>>>>>>>> >> >>> if it causes big performance issues.
>>>>>>>>> >> >>>
>>>>>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>>>>>>> >> >>> <co...@koeninger.org>
>>>>>>>>> >> >>> wrote:
>>>>>>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>>>>>>> >> >>>> calls
>>>>>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>>>>>>> >> >>>> receiver doesn't sound right in any case.
>>>>>>>>> >> >>>>
>>>>>>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>>>>>>> >> >>>> large
>>>>>>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>>>>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>>>>>>> >> >>>> of
>>>>>>>>> >> >>>> topicpartitions, etc)?
>>>>>>>>> >> >>>>
>>>>>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>>>>>>> >> >>>> streams, or
>>>>>>>>> >> >>>> even just kafkacat, as a baseline.
>>>>>>>>> >> >>>>
>>>>>>>>> >> >>>>
>>>>>>>>> >> >>>>
>>>>>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>>>>>>> >> >>>> <di...@uw.edu> wrote:
>>>>>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>>>>>>> >> >>>>> archives
>>>>>>>>> >> >>>>> but
>>>>>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>>>>>>> >> >>>>> to
>>>>>>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>>>>>>> >> >>>>> Streaming
>>>>>>>>> >> >>>>> tab
>>>>>>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>>>>>>> >> >>>>> "functionality"
>>>>>>>>> >> >>>>> except:
>>>>>>>>> >> >>>>>
>>>>>>>>> >> >>>>>
>>>>>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>>>>>>> >> >>>>> KafkaUtils
>>>>>>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>>>>>>> >> >>>>> kafkaParams,
>>>>>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>>>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>>>>> >> >>>>>
>>>>>>>>> >> >>>>>        dstream.print();
>>>>>>>>> >> >>>>>
>>>>>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>>>>>>> >> >>>>> around
>>>>>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>>>>>>> >> >>>>> /
>>>>>>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>>>>>>> >> >>>>> me,
>>>>>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>>>>>>> >> >>>>> using
>>>>>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>>>>>> >> >>>>> amount of reads.
>>>>>>>>> >> >>>>>
>>>>>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>>>>>>> >> >>>>> expected
>>>>>>>>> >> >>>>> way
>>>>>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>>>>>> >> >>>>> wrong?
>>>>>>>>> >> >>>>>
>>>>>>>>> >> >>>>> My application looks like
>>>>>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>> >> >>>>>
>>>>>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>>>>>>> >> >>>>> <co...@koeninger.org>
>>>>>>>>> >> >>>>> wrote:
>>>>>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>>>>>>> >> >>>>>> just
>>>>>>>>> >> >>>>>> deserialize)?
>>>>>>>>> >> >>>>>>
>>>>>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>>>>>>> >> >>>>>> The
>>>>>>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>>>>>>> >> >>>>>> stuck
>>>>>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>>>>>>> >> >>>>>> search
>>>>>>>>> >> >>>>>> the mailing list archives.
>>>>>>>>> >> >>>>>>
>>>>>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>>>>>>> >> >>>>>> <di...@uw.edu> wrote:
>>>>>>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>>>>>>> >> >>>>>>> with
>>>>>>>>> >> >>>>>>> 1.7
>>>>>>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>>>>>>> >> >>>>>>> insert
>>>>>>>>> >> >>>>>>> into
>>>>>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>>>>>>> >> >>>>>>> 1.2.
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>>>>>>> >> >>>>>>> between
>>>>>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>>>>>>> >> >>>>>>> consume
>>>>>>>>> >> >>>>>>> the
>>>>>>>>> >> >>>>>>> entries.
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>>>>>>> >> >>>>>>> bottleneck.
>>>>>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>>>>>>> >> >>>>>>> didn't
>>>>>>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>>>>>>> >> >>>>>>> different
>>>>>>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>>>>>>> >> >>>>>>> performance.
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>>>>>>> >> >>>>>>> kafka
>>>>>>>>> >> >>>>>>> using
>>>>>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>>>>>>> >> >>>>>>> like to
>>>>>>>>> >> >>>>>>> at least get 10%.
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>> My application looks like
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>>>>>>> >> >>>>>>> assistance.
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>> >> >>>>>>>
>>>>>>>>> >> >>>>
>>>>>>>>> >> >>>>
>>>>>>>>> >> >>>> ---------------------------------------------------------------------
>>>>>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>> >> >>>>
>>>>>>>>> >>
>>>>>>>>> >> ---------------------------------------------------------------------
>>>>>>>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>> >>
>>>>>>>>> >
>>>>>>>>
>>>>>>>>
>>>>>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
Thanks @Cody, I will try that out. In the interm, I tried to validate
my Hbase cluster by running a random write test and see 30-40K writes
per second. This suggests there is noticeable room for improvement.

On Tue, Jun 21, 2016 at 8:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
> Take HBase out of the equation and just measure what your read
> performance is by doing something like
>
> createDirectStream(...).foreach(_.println)
>
> not take() or print()
>
> On Tue, Jun 21, 2016 at 3:19 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>> @Cody I was able to bring my processing time down to a second by
>> setting maxRatePerPartition as discussed. My bad that I didn't
>> recognize it as the cause of my scheduling delay.
>>
>> Since then I've tried experimenting with a larger Spark Context
>> duration. I've been trying to get some noticeable improvement
>> inserting messages from Kafka -> Hbase using the above application.
>> I'm currently getting around 3500 inserts / second on a 9 node hbase
>> cluster. So far, I haven't been able to get much more throughput. Then
>> I'm looking for advice here how I should tune Kafka and Spark for this
>> job.
>>
>> I can create a kafka topic with as many partitions that I want. I can
>> set the Duration and maxRatePerPartition. I have 1.7 billion messages
>> that I can insert rather quickly into the Kafka queue, and I'd like to
>> get them into Hbase as quickly as possible.
>>
>> I'm looking for advice regarding # Kafka Topic Partitions / Streaming
>> Duration / maxRatePerPartition / any other spark settings or code
>> changes that I should make to try to get a better consumption rate.
>>
>> Thanks for all the help so far, this is the first Spark application I
>> have written.
>>
>> On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>> I'll try dropping the maxRatePerPartition=400, or maybe even lower.
>>> However even at application starts up I have this large scheduling
>>> delay. I will report my progress later on.
>>>
>>> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>> If your batch time is 1 second and your average processing time is
>>>> 1.16 seconds, you're always going to be falling behind.  That would
>>>> explain why you've built up an hour of scheduling delay after eight
>>>> hours of running.
>>>>
>>>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>> Hi Mich again,
>>>>>
>>>>> Regarding batch window, etc. I have provided the sources, but I'm not
>>>>> currently calling the window function. Did you see the program source?
>>>>> It's only 100 lines.
>>>>>
>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>
>>>>> Then I would expect I'm using defaults, other than what has been shown
>>>>> in the configuration.
>>>>>
>>>>> For example:
>>>>>
>>>>> In the launcher configuration I set --conf
>>>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>>>>> are 500 messages for the duration set in the application:
>>>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>>>>> Duration(1000));
>>>>>
>>>>>
>>>>> Then with the --num-executors 6 \ submit flag, and the
>>>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>>>>> arrive at the 3000 events per batch in the UI, pasted above.
>>>>>
>>>>> Feel free to correct me if I'm wrong.
>>>>>
>>>>> Then are you suggesting that I set the window?
>>>>>
>>>>> Maybe following this as reference:
>>>>>
>>>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>>>>
>>>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>>>> <mi...@gmail.com> wrote:
>>>>>> Ok
>>>>>>
>>>>>> What is the set up for these please?
>>>>>>
>>>>>> batch window
>>>>>> window length
>>>>>> sliding interval
>>>>>>
>>>>>> And also in each batch window how much data do you get in (no of messages in
>>>>>> the topic whatever)?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>>>>
>>>>>>> I believe you have an issue with performance?
>>>>>>>
>>>>>>> have you checked spark GUI (default 4040) for details including shuffles
>>>>>>> etc?
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>>>>
>>>>>>>> There are 25 nodes in the spark cluster.
>>>>>>>>
>>>>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>>>>> <mi...@gmail.com> wrote:
>>>>>>>> > how many nodes are in your cluster?
>>>>>>>> >
>>>>>>>> > --num-executors 6 \
>>>>>>>> >  --driver-memory 4G \
>>>>>>>> >  --executor-memory 2G \
>>>>>>>> >  --total-executor-cores 12 \
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Dr Mich Talebzadeh
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > LinkedIn
>>>>>>>> >
>>>>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > http://talebzadehmich.wordpress.com
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>>>>>>>> > wrote:
>>>>>>>> >>
>>>>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>>>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>>>>>> >> as described in this thread. I was away from this project for awhile
>>>>>>>> >> due to events in my family.
>>>>>>>> >>
>>>>>>>> >> Currently my scheduling delay is high, but the processing time is
>>>>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>>>>>> >> some details below, including the script I use to launch the
>>>>>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>>>>>> >> with my launch method that could be causing the delay, related to the
>>>>>>>> >> included jars?
>>>>>>>> >>
>>>>>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>>>>>> >> for the application?
>>>>>>>> >>
>>>>>>>> >> Any advice is appriciated.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> The application:
>>>>>>>> >>
>>>>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> From the streaming UI I get something like:
>>>>>>>> >>
>>>>>>>> >> table Completed Batches (last 1000 out of 27136)
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>>>>> >>
>>>>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>> >>
>>>>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>> >>
>>>>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> Here's how I'm launching the spark application.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> #!/usr/bin/env bash
>>>>>>>> >>
>>>>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>>>>> >>
>>>>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>>>>> >>
>>>>>>>> >> export
>>>>>>>> >>
>>>>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>>>>>> >>
>>>>>>>> >> --class com.example.KafkaToHbase \
>>>>>>>> >>
>>>>>>>> >> --master spark://spark_master:7077 \
>>>>>>>> >>
>>>>>>>> >> --deploy-mode client \
>>>>>>>> >>
>>>>>>>> >> --num-executors 6 \
>>>>>>>> >>
>>>>>>>> >> --driver-memory 4G \
>>>>>>>> >>
>>>>>>>> >> --executor-memory 2G \
>>>>>>>> >>
>>>>>>>> >> --total-executor-cores 12 \
>>>>>>>> >>
>>>>>>>> >> --jars
>>>>>>>> >>
>>>>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>>>>> >> \
>>>>>>>> >>
>>>>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>>>>> >>
>>>>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>>>>>> >>
>>>>>>>> >> --conf spark.eventLog.enabled=false \
>>>>>>>> >>
>>>>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>>>>> >>
>>>>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>>>>>> >>
>>>>>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>>>>>> >>
>>>>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>>>>>> >>
>>>>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>>>>>> >>
>>>>>>>> >> --driver-java-options
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>>>>> >> \
>>>>>>>> >>
>>>>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>>>>>> >> "broker1:9092,broker2:9092"
>>>>>>>> >>
>>>>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>>>>>> >> <di...@uw.edu>
>>>>>>>> >> wrote:
>>>>>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>>>>>> >> > Then I'm in the process of using the direct api.
>>>>>>>> >> >
>>>>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>>>>>>> >> > wrote:
>>>>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>>>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>>>>>> >> >>
>>>>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>>>>>> >> >> <di...@uw.edu>
>>>>>>>> >> >> wrote:
>>>>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>>>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>>>>>> >> >>> my
>>>>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>>>>>> >> >>> entries
>>>>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>>>>>> >> >>>
>>>>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>>>>>> >> >>> jars
>>>>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>>>>>> >> >>> without
>>>>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>>>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>>>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>>>>>> >> >>> Kafka
>>>>>>>> >> >>> if it causes big performance issues.
>>>>>>>> >> >>>
>>>>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>>>>>> >> >>> <co...@koeninger.org>
>>>>>>>> >> >>> wrote:
>>>>>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>>>>>> >> >>>> calls
>>>>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>>>>>> >> >>>> receiver doesn't sound right in any case.
>>>>>>>> >> >>>>
>>>>>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>>>>>> >> >>>> large
>>>>>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>>>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>>>>>> >> >>>> of
>>>>>>>> >> >>>> topicpartitions, etc)?
>>>>>>>> >> >>>>
>>>>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>>>>>> >> >>>> streams, or
>>>>>>>> >> >>>> even just kafkacat, as a baseline.
>>>>>>>> >> >>>>
>>>>>>>> >> >>>>
>>>>>>>> >> >>>>
>>>>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>>>>>> >> >>>> <di...@uw.edu> wrote:
>>>>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>>>>>> >> >>>>> archives
>>>>>>>> >> >>>>> but
>>>>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>>>>>> >> >>>>> to
>>>>>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>>>>>> >> >>>>> Streaming
>>>>>>>> >> >>>>> tab
>>>>>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>>>>>> >> >>>>> "functionality"
>>>>>>>> >> >>>>> except:
>>>>>>>> >> >>>>>
>>>>>>>> >> >>>>>
>>>>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>>>>>> >> >>>>> KafkaUtils
>>>>>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>>>>>> >> >>>>> kafkaParams,
>>>>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>>>> >> >>>>>
>>>>>>>> >> >>>>>        dstream.print();
>>>>>>>> >> >>>>>
>>>>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>>>>>> >> >>>>> around
>>>>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>>>>>> >> >>>>> /
>>>>>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>>>>>> >> >>>>> me,
>>>>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>>>>>> >> >>>>> using
>>>>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>>>>> >> >>>>> amount of reads.
>>>>>>>> >> >>>>>
>>>>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>>>>>> >> >>>>> expected
>>>>>>>> >> >>>>> way
>>>>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>>>>> >> >>>>> wrong?
>>>>>>>> >> >>>>>
>>>>>>>> >> >>>>> My application looks like
>>>>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>> >> >>>>>
>>>>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>>>>>> >> >>>>> <co...@koeninger.org>
>>>>>>>> >> >>>>> wrote:
>>>>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>>>>>> >> >>>>>> just
>>>>>>>> >> >>>>>> deserialize)?
>>>>>>>> >> >>>>>>
>>>>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>>>>>> >> >>>>>> The
>>>>>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>>>>>> >> >>>>>> stuck
>>>>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>>>>>> >> >>>>>> search
>>>>>>>> >> >>>>>> the mailing list archives.
>>>>>>>> >> >>>>>>
>>>>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>>>>>> >> >>>>>> <di...@uw.edu> wrote:
>>>>>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>>>>>> >> >>>>>>> with
>>>>>>>> >> >>>>>>> 1.7
>>>>>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>>>>>> >> >>>>>>> insert
>>>>>>>> >> >>>>>>> into
>>>>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>>>>>> >> >>>>>>> 1.2.
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>>>>>> >> >>>>>>> between
>>>>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>>>>>> >> >>>>>>> consume
>>>>>>>> >> >>>>>>> the
>>>>>>>> >> >>>>>>> entries.
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>>>>>> >> >>>>>>> bottleneck.
>>>>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>>>>>> >> >>>>>>> didn't
>>>>>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>>>>>> >> >>>>>>> different
>>>>>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>>>>>> >> >>>>>>> performance.
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>>>>>> >> >>>>>>> kafka
>>>>>>>> >> >>>>>>> using
>>>>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>>>>>> >> >>>>>>> like to
>>>>>>>> >> >>>>>>> at least get 10%.
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>> My application looks like
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>>>>>> >> >>>>>>> assistance.
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>> >> >>>>>>>
>>>>>>>> >> >>>>
>>>>>>>> >> >>>>
>>>>>>>> >> >>>> ---------------------------------------------------------------------
>>>>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>> >> >>>>
>>>>>>>> >>
>>>>>>>> >> ---------------------------------------------------------------------
>>>>>>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>> >>
>>>>>>>> >
>>>>>>>
>>>>>>>
>>>>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Cody Koeninger <co...@koeninger.org>.
Take HBase out of the equation and just measure what your read
performance is by doing something like

createDirectStream(...).foreach(_.println)

not take() or print()

On Tue, Jun 21, 2016 at 3:19 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
> @Cody I was able to bring my processing time down to a second by
> setting maxRatePerPartition as discussed. My bad that I didn't
> recognize it as the cause of my scheduling delay.
>
> Since then I've tried experimenting with a larger Spark Context
> duration. I've been trying to get some noticeable improvement
> inserting messages from Kafka -> Hbase using the above application.
> I'm currently getting around 3500 inserts / second on a 9 node hbase
> cluster. So far, I haven't been able to get much more throughput. Then
> I'm looking for advice here how I should tune Kafka and Spark for this
> job.
>
> I can create a kafka topic with as many partitions that I want. I can
> set the Duration and maxRatePerPartition. I have 1.7 billion messages
> that I can insert rather quickly into the Kafka queue, and I'd like to
> get them into Hbase as quickly as possible.
>
> I'm looking for advice regarding # Kafka Topic Partitions / Streaming
> Duration / maxRatePerPartition / any other spark settings or code
> changes that I should make to try to get a better consumption rate.
>
> Thanks for all the help so far, this is the first Spark application I
> have written.
>
> On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>> I'll try dropping the maxRatePerPartition=400, or maybe even lower.
>> However even at application starts up I have this large scheduling
>> delay. I will report my progress later on.
>>
>> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>> If your batch time is 1 second and your average processing time is
>>> 1.16 seconds, you're always going to be falling behind.  That would
>>> explain why you've built up an hour of scheduling delay after eight
>>> hours of running.
>>>
>>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>> Hi Mich again,
>>>>
>>>> Regarding batch window, etc. I have provided the sources, but I'm not
>>>> currently calling the window function. Did you see the program source?
>>>> It's only 100 lines.
>>>>
>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>
>>>> Then I would expect I'm using defaults, other than what has been shown
>>>> in the configuration.
>>>>
>>>> For example:
>>>>
>>>> In the launcher configuration I set --conf
>>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>>>> are 500 messages for the duration set in the application:
>>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>>>> Duration(1000));
>>>>
>>>>
>>>> Then with the --num-executors 6 \ submit flag, and the
>>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>>>> arrive at the 3000 events per batch in the UI, pasted above.
>>>>
>>>> Feel free to correct me if I'm wrong.
>>>>
>>>> Then are you suggesting that I set the window?
>>>>
>>>> Maybe following this as reference:
>>>>
>>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>>>
>>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>>> <mi...@gmail.com> wrote:
>>>>> Ok
>>>>>
>>>>> What is the set up for these please?
>>>>>
>>>>> batch window
>>>>> window length
>>>>> sliding interval
>>>>>
>>>>> And also in each batch window how much data do you get in (no of messages in
>>>>> the topic whatever)?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>>>
>>>>>> I believe you have an issue with performance?
>>>>>>
>>>>>> have you checked spark GUI (default 4040) for details including shuffles
>>>>>> etc?
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>>>
>>>>>>> There are 25 nodes in the spark cluster.
>>>>>>>
>>>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>>>> <mi...@gmail.com> wrote:
>>>>>>> > how many nodes are in your cluster?
>>>>>>> >
>>>>>>> > --num-executors 6 \
>>>>>>> >  --driver-memory 4G \
>>>>>>> >  --executor-memory 2G \
>>>>>>> >  --total-executor-cores 12 \
>>>>>>> >
>>>>>>> >
>>>>>>> > Dr Mich Talebzadeh
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > LinkedIn
>>>>>>> >
>>>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > http://talebzadehmich.wordpress.com
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>>>>> >> as described in this thread. I was away from this project for awhile
>>>>>>> >> due to events in my family.
>>>>>>> >>
>>>>>>> >> Currently my scheduling delay is high, but the processing time is
>>>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>>>>> >> some details below, including the script I use to launch the
>>>>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>>>>> >> with my launch method that could be causing the delay, related to the
>>>>>>> >> included jars?
>>>>>>> >>
>>>>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>>>>> >> for the application?
>>>>>>> >>
>>>>>>> >> Any advice is appriciated.
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> The application:
>>>>>>> >>
>>>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> From the streaming UI I get something like:
>>>>>>> >>
>>>>>>> >> table Completed Batches (last 1000 out of 27136)
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>>>> >>
>>>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>> >>
>>>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>> >>
>>>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> Here's how I'm launching the spark application.
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> #!/usr/bin/env bash
>>>>>>> >>
>>>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>>>> >>
>>>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>>>> >>
>>>>>>> >> export
>>>>>>> >>
>>>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>>>>> >>
>>>>>>> >> --class com.example.KafkaToHbase \
>>>>>>> >>
>>>>>>> >> --master spark://spark_master:7077 \
>>>>>>> >>
>>>>>>> >> --deploy-mode client \
>>>>>>> >>
>>>>>>> >> --num-executors 6 \
>>>>>>> >>
>>>>>>> >> --driver-memory 4G \
>>>>>>> >>
>>>>>>> >> --executor-memory 2G \
>>>>>>> >>
>>>>>>> >> --total-executor-cores 12 \
>>>>>>> >>
>>>>>>> >> --jars
>>>>>>> >>
>>>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>>>> >> \
>>>>>>> >>
>>>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>>>> >>
>>>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>>>>> >>
>>>>>>> >> --conf spark.eventLog.enabled=false \
>>>>>>> >>
>>>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>>>> >>
>>>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>>>>> >>
>>>>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>>>>> >>
>>>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>>>>> >>
>>>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>>>>> >>
>>>>>>> >> --driver-java-options
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>>>> >> \
>>>>>>> >>
>>>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>>>>> >> "broker1:9092,broker2:9092"
>>>>>>> >>
>>>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>>>>> >> <di...@uw.edu>
>>>>>>> >> wrote:
>>>>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>>>>> >> > Then I'm in the process of using the direct api.
>>>>>>> >> >
>>>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>>>>>> >> > wrote:
>>>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>>>>> >> >>
>>>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>>>>> >> >> <di...@uw.edu>
>>>>>>> >> >> wrote:
>>>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>>>>> >> >>> my
>>>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>>>>> >> >>> entries
>>>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>>>>> >> >>>
>>>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>>>>> >> >>> jars
>>>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>>>>> >> >>> without
>>>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>>>>> >> >>> Kafka
>>>>>>> >> >>> if it causes big performance issues.
>>>>>>> >> >>>
>>>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>>>>> >> >>> <co...@koeninger.org>
>>>>>>> >> >>> wrote:
>>>>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>>>>> >> >>>> calls
>>>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>>>>> >> >>>> receiver doesn't sound right in any case.
>>>>>>> >> >>>>
>>>>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>>>>> >> >>>> large
>>>>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>>>>> >> >>>> of
>>>>>>> >> >>>> topicpartitions, etc)?
>>>>>>> >> >>>>
>>>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>>>>> >> >>>> streams, or
>>>>>>> >> >>>> even just kafkacat, as a baseline.
>>>>>>> >> >>>>
>>>>>>> >> >>>>
>>>>>>> >> >>>>
>>>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>>>>> >> >>>> <di...@uw.edu> wrote:
>>>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>>>>> >> >>>>> archives
>>>>>>> >> >>>>> but
>>>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>>>>> >> >>>>> to
>>>>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>>>>> >> >>>>> Streaming
>>>>>>> >> >>>>> tab
>>>>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>>>>> >> >>>>> "functionality"
>>>>>>> >> >>>>> except:
>>>>>>> >> >>>>>
>>>>>>> >> >>>>>
>>>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>>>>> >> >>>>> KafkaUtils
>>>>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>>>>> >> >>>>> kafkaParams,
>>>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>>> >> >>>>>
>>>>>>> >> >>>>>        dstream.print();
>>>>>>> >> >>>>>
>>>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>>>>> >> >>>>> around
>>>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>>>>> >> >>>>> /
>>>>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>>>>> >> >>>>> me,
>>>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>>>>> >> >>>>> using
>>>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>>>> >> >>>>> amount of reads.
>>>>>>> >> >>>>>
>>>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>>>>> >> >>>>> expected
>>>>>>> >> >>>>> way
>>>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>>>> >> >>>>> wrong?
>>>>>>> >> >>>>>
>>>>>>> >> >>>>> My application looks like
>>>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>> >> >>>>>
>>>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>>>>> >> >>>>> <co...@koeninger.org>
>>>>>>> >> >>>>> wrote:
>>>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>>>>> >> >>>>>> just
>>>>>>> >> >>>>>> deserialize)?
>>>>>>> >> >>>>>>
>>>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>>>>> >> >>>>>> The
>>>>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>>>>> >> >>>>>> stuck
>>>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>>>>> >> >>>>>> search
>>>>>>> >> >>>>>> the mailing list archives.
>>>>>>> >> >>>>>>
>>>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>>>>> >> >>>>>> <di...@uw.edu> wrote:
>>>>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>>>>> >> >>>>>>> with
>>>>>>> >> >>>>>>> 1.7
>>>>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>>>>> >> >>>>>>> insert
>>>>>>> >> >>>>>>> into
>>>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>>>>> >> >>>>>>> 1.2.
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>>>>> >> >>>>>>> between
>>>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>>>>> >> >>>>>>> consume
>>>>>>> >> >>>>>>> the
>>>>>>> >> >>>>>>> entries.
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>>>>> >> >>>>>>> bottleneck.
>>>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>>>>> >> >>>>>>> didn't
>>>>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>>>>> >> >>>>>>> different
>>>>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>>>>> >> >>>>>>> performance.
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>>>>> >> >>>>>>> kafka
>>>>>>> >> >>>>>>> using
>>>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>>>>> >> >>>>>>> like to
>>>>>>> >> >>>>>>> at least get 10%.
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>> My application looks like
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>>>>> >> >>>>>>> assistance.
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>> >> >>>>>>>
>>>>>>> >> >>>>
>>>>>>> >> >>>>
>>>>>>> >> >>>> ---------------------------------------------------------------------
>>>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>> >> >>>>
>>>>>>> >>
>>>>>>> >> ---------------------------------------------------------------------
>>>>>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>> >>
>>>>>>> >
>>>>>>
>>>>>>
>>>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
@Cody I was able to bring my processing time down to a second by
setting maxRatePerPartition as discussed. My bad that I didn't
recognize it as the cause of my scheduling delay.

Since then I've tried experimenting with a larger Spark Context
duration. I've been trying to get some noticeable improvement
inserting messages from Kafka -> Hbase using the above application.
I'm currently getting around 3500 inserts / second on a 9 node hbase
cluster. So far, I haven't been able to get much more throughput. Then
I'm looking for advice here how I should tune Kafka and Spark for this
job.

I can create a kafka topic with as many partitions that I want. I can
set the Duration and maxRatePerPartition. I have 1.7 billion messages
that I can insert rather quickly into the Kafka queue, and I'd like to
get them into Hbase as quickly as possible.

I'm looking for advice regarding # Kafka Topic Partitions / Streaming
Duration / maxRatePerPartition / any other spark settings or code
changes that I should make to try to get a better consumption rate.

Thanks for all the help so far, this is the first Spark application I
have written.

On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
> I'll try dropping the maxRatePerPartition=400, or maybe even lower.
> However even at application starts up I have this large scheduling
> delay. I will report my progress later on.
>
> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <co...@koeninger.org> wrote:
>> If your batch time is 1 second and your average processing time is
>> 1.16 seconds, you're always going to be falling behind.  That would
>> explain why you've built up an hour of scheduling delay after eight
>> hours of running.
>>
>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>> Hi Mich again,
>>>
>>> Regarding batch window, etc. I have provided the sources, but I'm not
>>> currently calling the window function. Did you see the program source?
>>> It's only 100 lines.
>>>
>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>
>>> Then I would expect I'm using defaults, other than what has been shown
>>> in the configuration.
>>>
>>> For example:
>>>
>>> In the launcher configuration I set --conf
>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>>> are 500 messages for the duration set in the application:
>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>>> Duration(1000));
>>>
>>>
>>> Then with the --num-executors 6 \ submit flag, and the
>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>>> arrive at the 3000 events per batch in the UI, pasted above.
>>>
>>> Feel free to correct me if I'm wrong.
>>>
>>> Then are you suggesting that I set the window?
>>>
>>> Maybe following this as reference:
>>>
>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>>
>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>> <mi...@gmail.com> wrote:
>>>> Ok
>>>>
>>>> What is the set up for these please?
>>>>
>>>> batch window
>>>> window length
>>>> sliding interval
>>>>
>>>> And also in each batch window how much data do you get in (no of messages in
>>>> the topic whatever)?
>>>>
>>>>
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>>
>>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>>
>>>>> I believe you have an issue with performance?
>>>>>
>>>>> have you checked spark GUI (default 4040) for details including shuffles
>>>>> etc?
>>>>>
>>>>> HTH
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>>
>>>>>> There are 25 nodes in the spark cluster.
>>>>>>
>>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>>> <mi...@gmail.com> wrote:
>>>>>> > how many nodes are in your cluster?
>>>>>> >
>>>>>> > --num-executors 6 \
>>>>>> >  --driver-memory 4G \
>>>>>> >  --executor-memory 2G \
>>>>>> >  --total-executor-cores 12 \
>>>>>> >
>>>>>> >
>>>>>> > Dr Mich Talebzadeh
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > LinkedIn
>>>>>> >
>>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > http://talebzadehmich.wordpress.com
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>>>>>> > wrote:
>>>>>> >>
>>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>>>> >> as described in this thread. I was away from this project for awhile
>>>>>> >> due to events in my family.
>>>>>> >>
>>>>>> >> Currently my scheduling delay is high, but the processing time is
>>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>>>> >> some details below, including the script I use to launch the
>>>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>>>> >> with my launch method that could be causing the delay, related to the
>>>>>> >> included jars?
>>>>>> >>
>>>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>>>> >> for the application?
>>>>>> >>
>>>>>> >> Any advice is appriciated.
>>>>>> >>
>>>>>> >>
>>>>>> >> The application:
>>>>>> >>
>>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>> >>
>>>>>> >>
>>>>>> >> From the streaming UI I get something like:
>>>>>> >>
>>>>>> >> table Completed Batches (last 1000 out of 27136)
>>>>>> >>
>>>>>> >>
>>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>>> >>
>>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>> >>
>>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>> >>
>>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>> >>
>>>>>> >>
>>>>>> >> Here's how I'm launching the spark application.
>>>>>> >>
>>>>>> >>
>>>>>> >> #!/usr/bin/env bash
>>>>>> >>
>>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>>> >>
>>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>>> >>
>>>>>> >> export
>>>>>> >>
>>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>>> >>
>>>>>> >>
>>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>>>> >>
>>>>>> >> --class com.example.KafkaToHbase \
>>>>>> >>
>>>>>> >> --master spark://spark_master:7077 \
>>>>>> >>
>>>>>> >> --deploy-mode client \
>>>>>> >>
>>>>>> >> --num-executors 6 \
>>>>>> >>
>>>>>> >> --driver-memory 4G \
>>>>>> >>
>>>>>> >> --executor-memory 2G \
>>>>>> >>
>>>>>> >> --total-executor-cores 12 \
>>>>>> >>
>>>>>> >> --jars
>>>>>> >>
>>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>>> >> \
>>>>>> >>
>>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>>> >>
>>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>>>> >>
>>>>>> >> --conf spark.eventLog.enabled=false \
>>>>>> >>
>>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>>> >>
>>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>>>> >>
>>>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>>>> >>
>>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>>>> >>
>>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>>>> >>
>>>>>> >> --driver-java-options
>>>>>> >>
>>>>>> >>
>>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>>> >> \
>>>>>> >>
>>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>>>> >> "broker1:9092,broker2:9092"
>>>>>> >>
>>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>>>> >> <di...@uw.edu>
>>>>>> >> wrote:
>>>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>>>> >> > Then I'm in the process of using the direct api.
>>>>>> >> >
>>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> >> > wrote:
>>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>>>> >> >>
>>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>>>> >> >> <di...@uw.edu>
>>>>>> >> >> wrote:
>>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>>>> >> >>> my
>>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>>>> >> >>> entries
>>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>>>> >> >>>
>>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>>>> >> >>> jars
>>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>>>> >> >>> without
>>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>>>> >> >>> Kafka
>>>>>> >> >>> if it causes big performance issues.
>>>>>> >> >>>
>>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>>>> >> >>> <co...@koeninger.org>
>>>>>> >> >>> wrote:
>>>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>>>> >> >>>> calls
>>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>>>> >> >>>> receiver doesn't sound right in any case.
>>>>>> >> >>>>
>>>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>>>> >> >>>> large
>>>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>>>> >> >>>> of
>>>>>> >> >>>> topicpartitions, etc)?
>>>>>> >> >>>>
>>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>>>> >> >>>> streams, or
>>>>>> >> >>>> even just kafkacat, as a baseline.
>>>>>> >> >>>>
>>>>>> >> >>>>
>>>>>> >> >>>>
>>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>>>> >> >>>> <di...@uw.edu> wrote:
>>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>>>> >> >>>>> archives
>>>>>> >> >>>>> but
>>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>>>> >> >>>>> to
>>>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>>>> >> >>>>> Streaming
>>>>>> >> >>>>> tab
>>>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>>>> >> >>>>> "functionality"
>>>>>> >> >>>>> except:
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>>>> >> >>>>> KafkaUtils
>>>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>>>> >> >>>>> kafkaParams,
>>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>> >> >>>>>
>>>>>> >> >>>>>        dstream.print();
>>>>>> >> >>>>>
>>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>>>> >> >>>>> around
>>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>>>> >> >>>>> /
>>>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>>>> >> >>>>> me,
>>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>>>> >> >>>>> using
>>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>>> >> >>>>> amount of reads.
>>>>>> >> >>>>>
>>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>>>> >> >>>>> expected
>>>>>> >> >>>>> way
>>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>>> >> >>>>> wrong?
>>>>>> >> >>>>>
>>>>>> >> >>>>> My application looks like
>>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>> >> >>>>>
>>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>>>> >> >>>>> <co...@koeninger.org>
>>>>>> >> >>>>> wrote:
>>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>>>> >> >>>>>> just
>>>>>> >> >>>>>> deserialize)?
>>>>>> >> >>>>>>
>>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>>>> >> >>>>>> The
>>>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>>>> >> >>>>>> stuck
>>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>>>> >> >>>>>> search
>>>>>> >> >>>>>> the mailing list archives.
>>>>>> >> >>>>>>
>>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>>>> >> >>>>>> <di...@uw.edu> wrote:
>>>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>>>> >> >>>>>>> with
>>>>>> >> >>>>>>> 1.7
>>>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>>>> >> >>>>>>> insert
>>>>>> >> >>>>>>> into
>>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>>>> >> >>>>>>> 1.2.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>>>> >> >>>>>>> between
>>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>>>> >> >>>>>>> consume
>>>>>> >> >>>>>>> the
>>>>>> >> >>>>>>> entries.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>>>> >> >>>>>>> bottleneck.
>>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>>>> >> >>>>>>> didn't
>>>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>>>> >> >>>>>>> different
>>>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>>>> >> >>>>>>> performance.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>>>> >> >>>>>>> kafka
>>>>>> >> >>>>>>> using
>>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>>>> >> >>>>>>> like to
>>>>>> >> >>>>>>> at least get 10%.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> My application looks like
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>>>> >> >>>>>>> assistance.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>> >> >>>>>>>
>>>>>> >> >>>>
>>>>>> >> >>>>
>>>>>> >> >>>> ---------------------------------------------------------------------
>>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>> >> >>>>
>>>>>> >>
>>>>>> >> ---------------------------------------------------------------------
>>>>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>>>>> >>
>>>>>> >
>>>>>
>>>>>
>>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
I'll try dropping the maxRatePerPartition=400, or maybe even lower.
However even at application starts up I have this large scheduling
delay. I will report my progress later on.

On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <co...@koeninger.org> wrote:
> If your batch time is 1 second and your average processing time is
> 1.16 seconds, you're always going to be falling behind.  That would
> explain why you've built up an hour of scheduling delay after eight
> hours of running.
>
> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>> Hi Mich again,
>>
>> Regarding batch window, etc. I have provided the sources, but I'm not
>> currently calling the window function. Did you see the program source?
>> It's only 100 lines.
>>
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>> Then I would expect I'm using defaults, other than what has been shown
>> in the configuration.
>>
>> For example:
>>
>> In the launcher configuration I set --conf
>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>> are 500 messages for the duration set in the application:
>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>> Duration(1000));
>>
>>
>> Then with the --num-executors 6 \ submit flag, and the
>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>> arrive at the 3000 events per batch in the UI, pasted above.
>>
>> Feel free to correct me if I'm wrong.
>>
>> Then are you suggesting that I set the window?
>>
>> Maybe following this as reference:
>>
>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>
>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>> <mi...@gmail.com> wrote:
>>> Ok
>>>
>>> What is the set up for these please?
>>>
>>> batch window
>>> window length
>>> sliding interval
>>>
>>> And also in each batch window how much data do you get in (no of messages in
>>> the topic whatever)?
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>>
>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>
>>>> I believe you have an issue with performance?
>>>>
>>>> have you checked spark GUI (default 4040) for details including shuffles
>>>> etc?
>>>>
>>>> HTH
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>>
>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>
>>>>> There are 25 nodes in the spark cluster.
>>>>>
>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>> <mi...@gmail.com> wrote:
>>>>> > how many nodes are in your cluster?
>>>>> >
>>>>> > --num-executors 6 \
>>>>> >  --driver-memory 4G \
>>>>> >  --executor-memory 2G \
>>>>> >  --total-executor-cores 12 \
>>>>> >
>>>>> >
>>>>> > Dr Mich Talebzadeh
>>>>> >
>>>>> >
>>>>> >
>>>>> > LinkedIn
>>>>> >
>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> >
>>>>> >
>>>>> >
>>>>> > http://talebzadehmich.wordpress.com
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>>>>> > wrote:
>>>>> >>
>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>>> >> as described in this thread. I was away from this project for awhile
>>>>> >> due to events in my family.
>>>>> >>
>>>>> >> Currently my scheduling delay is high, but the processing time is
>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>>> >> some details below, including the script I use to launch the
>>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>>> >> with my launch method that could be causing the delay, related to the
>>>>> >> included jars?
>>>>> >>
>>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>>> >> for the application?
>>>>> >>
>>>>> >> Any advice is appriciated.
>>>>> >>
>>>>> >>
>>>>> >> The application:
>>>>> >>
>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>> >>
>>>>> >>
>>>>> >> From the streaming UI I get something like:
>>>>> >>
>>>>> >> table Completed Batches (last 1000 out of 27136)
>>>>> >>
>>>>> >>
>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>> >>
>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>> >>
>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>> >>
>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>> >>
>>>>> >>
>>>>> >> Here's how I'm launching the spark application.
>>>>> >>
>>>>> >>
>>>>> >> #!/usr/bin/env bash
>>>>> >>
>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>> >>
>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>> >>
>>>>> >> export
>>>>> >>
>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>> >>
>>>>> >>
>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>>> >>
>>>>> >> --class com.example.KafkaToHbase \
>>>>> >>
>>>>> >> --master spark://spark_master:7077 \
>>>>> >>
>>>>> >> --deploy-mode client \
>>>>> >>
>>>>> >> --num-executors 6 \
>>>>> >>
>>>>> >> --driver-memory 4G \
>>>>> >>
>>>>> >> --executor-memory 2G \
>>>>> >>
>>>>> >> --total-executor-cores 12 \
>>>>> >>
>>>>> >> --jars
>>>>> >>
>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>> >> \
>>>>> >>
>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>> >>
>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>>> >>
>>>>> >> --conf spark.eventLog.enabled=false \
>>>>> >>
>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>> >>
>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>>> >>
>>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>>> >>
>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>>> >>
>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>>> >>
>>>>> >> --driver-java-options
>>>>> >>
>>>>> >>
>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>> >> \
>>>>> >>
>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>>> >> "broker1:9092,broker2:9092"
>>>>> >>
>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>>> >> <di...@uw.edu>
>>>>> >> wrote:
>>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>>> >> > Then I'm in the process of using the direct api.
>>>>> >> >
>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>>>> >> > wrote:
>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>>> >> >>
>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>>> >> >> <di...@uw.edu>
>>>>> >> >> wrote:
>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>>> >> >>> my
>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>>> >> >>> entries
>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>>> >> >>>
>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>>> >> >>> jars
>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>>> >> >>> without
>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>>> >> >>> Kafka
>>>>> >> >>> if it causes big performance issues.
>>>>> >> >>>
>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>>> >> >>> <co...@koeninger.org>
>>>>> >> >>> wrote:
>>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>>> >> >>>> calls
>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>>> >> >>>> receiver doesn't sound right in any case.
>>>>> >> >>>>
>>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>>> >> >>>> large
>>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>>> >> >>>> of
>>>>> >> >>>> topicpartitions, etc)?
>>>>> >> >>>>
>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>>> >> >>>> streams, or
>>>>> >> >>>> even just kafkacat, as a baseline.
>>>>> >> >>>>
>>>>> >> >>>>
>>>>> >> >>>>
>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>>> >> >>>> <di...@uw.edu> wrote:
>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>>> >> >>>>> archives
>>>>> >> >>>>> but
>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>>> >> >>>>> to
>>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>>> >> >>>>> Streaming
>>>>> >> >>>>> tab
>>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>>> >> >>>>> "functionality"
>>>>> >> >>>>> except:
>>>>> >> >>>>>
>>>>> >> >>>>>
>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>>> >> >>>>> KafkaUtils
>>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>>> >> >>>>> kafkaParams,
>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>> >> >>>>>
>>>>> >> >>>>>        dstream.print();
>>>>> >> >>>>>
>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>>> >> >>>>> around
>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>>> >> >>>>> /
>>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>>> >> >>>>> me,
>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>>> >> >>>>> using
>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>> >> >>>>> amount of reads.
>>>>> >> >>>>>
>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>>> >> >>>>> expected
>>>>> >> >>>>> way
>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>> >> >>>>> wrong?
>>>>> >> >>>>>
>>>>> >> >>>>> My application looks like
>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>> >> >>>>>
>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>>> >> >>>>> <co...@koeninger.org>
>>>>> >> >>>>> wrote:
>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>>> >> >>>>>> just
>>>>> >> >>>>>> deserialize)?
>>>>> >> >>>>>>
>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>>> >> >>>>>> The
>>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>>> >> >>>>>> stuck
>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>>> >> >>>>>> search
>>>>> >> >>>>>> the mailing list archives.
>>>>> >> >>>>>>
>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>>> >> >>>>>> <di...@uw.edu> wrote:
>>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>>> >> >>>>>>> with
>>>>> >> >>>>>>> 1.7
>>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>>> >> >>>>>>> insert
>>>>> >> >>>>>>> into
>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>>> >> >>>>>>> 1.2.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>>> >> >>>>>>> between
>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>>> >> >>>>>>> consume
>>>>> >> >>>>>>> the
>>>>> >> >>>>>>> entries.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>>> >> >>>>>>> bottleneck.
>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>>> >> >>>>>>> didn't
>>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>>> >> >>>>>>> different
>>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>>> >> >>>>>>> performance.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>>> >> >>>>>>> kafka
>>>>> >> >>>>>>> using
>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>>> >> >>>>>>> like to
>>>>> >> >>>>>>> at least get 10%.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> My application looks like
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>>> >> >>>>>>> assistance.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>>
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>> >> >>>>>>>
>>>>> >> >>>>
>>>>> >> >>>>
>>>>> >> >>>> ---------------------------------------------------------------------
>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>> >> >>>>
>>>>> >>
>>>>> >> ---------------------------------------------------------------------
>>>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>>>> >>
>>>>> >
>>>>
>>>>
>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Cody Koeninger <co...@koeninger.org>.
If your batch time is 1 second and your average processing time is
1.16 seconds, you're always going to be falling behind.  That would
explain why you've built up an hour of scheduling delay after eight
hours of running.

On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
> Hi Mich again,
>
> Regarding batch window, etc. I have provided the sources, but I'm not
> currently calling the window function. Did you see the program source?
> It's only 100 lines.
>
> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>
> Then I would expect I'm using defaults, other than what has been shown
> in the configuration.
>
> For example:
>
> In the launcher configuration I set --conf
> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
> are 500 messages for the duration set in the application:
> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
> Duration(1000));
>
>
> Then with the --num-executors 6 \ submit flag, and the
> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
> arrive at the 3000 events per batch in the UI, pasted above.
>
> Feel free to correct me if I'm wrong.
>
> Then are you suggesting that I set the window?
>
> Maybe following this as reference:
>
> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>
> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
> <mi...@gmail.com> wrote:
>> Ok
>>
>> What is the set up for these please?
>>
>> batch window
>> window length
>> sliding interval
>>
>> And also in each batch window how much data do you get in (no of messages in
>> the topic whatever)?
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>
>>> I believe you have an issue with performance?
>>>
>>> have you checked spark GUI (default 4040) for details including shuffles
>>> etc?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>>
>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>
>>>> There are 25 nodes in the spark cluster.
>>>>
>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>> <mi...@gmail.com> wrote:
>>>> > how many nodes are in your cluster?
>>>> >
>>>> > --num-executors 6 \
>>>> >  --driver-memory 4G \
>>>> >  --executor-memory 2G \
>>>> >  --total-executor-cores 12 \
>>>> >
>>>> >
>>>> > Dr Mich Talebzadeh
>>>> >
>>>> >
>>>> >
>>>> > LinkedIn
>>>> >
>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> >
>>>> >
>>>> >
>>>> > http://talebzadehmich.wordpress.com
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>>>> > wrote:
>>>> >>
>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>> >> as described in this thread. I was away from this project for awhile
>>>> >> due to events in my family.
>>>> >>
>>>> >> Currently my scheduling delay is high, but the processing time is
>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>> >> some details below, including the script I use to launch the
>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>> >> with my launch method that could be causing the delay, related to the
>>>> >> included jars?
>>>> >>
>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>> >> for the application?
>>>> >>
>>>> >> Any advice is appriciated.
>>>> >>
>>>> >>
>>>> >> The application:
>>>> >>
>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>> >>
>>>> >>
>>>> >> From the streaming UI I get something like:
>>>> >>
>>>> >> table Completed Batches (last 1000 out of 27136)
>>>> >>
>>>> >>
>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>> >>
>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>> >>
>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>> >>
>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>> >>
>>>> >>
>>>> >> Here's how I'm launching the spark application.
>>>> >>
>>>> >>
>>>> >> #!/usr/bin/env bash
>>>> >>
>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>> >>
>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>> >>
>>>> >> export
>>>> >>
>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>> >>
>>>> >>
>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>> >>
>>>> >> --class com.example.KafkaToHbase \
>>>> >>
>>>> >> --master spark://spark_master:7077 \
>>>> >>
>>>> >> --deploy-mode client \
>>>> >>
>>>> >> --num-executors 6 \
>>>> >>
>>>> >> --driver-memory 4G \
>>>> >>
>>>> >> --executor-memory 2G \
>>>> >>
>>>> >> --total-executor-cores 12 \
>>>> >>
>>>> >> --jars
>>>> >>
>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>> >> \
>>>> >>
>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>> >>
>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>> >>
>>>> >> --conf spark.eventLog.enabled=false \
>>>> >>
>>>> >> --conf spark.eventLog.overwrite=true \
>>>> >>
>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>> >>
>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>> >>
>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>> >>
>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>> >>
>>>> >> --driver-java-options
>>>> >>
>>>> >>
>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>> >> \
>>>> >>
>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>> >> "broker1:9092,broker2:9092"
>>>> >>
>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>> >> <di...@uw.edu>
>>>> >> wrote:
>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>> >> > Then I'm in the process of using the direct api.
>>>> >> >
>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>>> >> > wrote:
>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>> >> >>
>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>> >> >> <di...@uw.edu>
>>>> >> >> wrote:
>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>> >> >>> my
>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>> >> >>> entries
>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>> >> >>>
>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>> >> >>> jars
>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>> >> >>> without
>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>> >> >>> Kafka
>>>> >> >>> if it causes big performance issues.
>>>> >> >>>
>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>> >> >>> <co...@koeninger.org>
>>>> >> >>> wrote:
>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>> >> >>>> calls
>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>> >> >>>> receiver doesn't sound right in any case.
>>>> >> >>>>
>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>> >> >>>> large
>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>> >> >>>> of
>>>> >> >>>> topicpartitions, etc)?
>>>> >> >>>>
>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>> >> >>>> streams, or
>>>> >> >>>> even just kafkacat, as a baseline.
>>>> >> >>>>
>>>> >> >>>>
>>>> >> >>>>
>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>> >> >>>> <di...@uw.edu> wrote:
>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>> >> >>>>> archives
>>>> >> >>>>> but
>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>> >> >>>>> to
>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>> >> >>>>> Streaming
>>>> >> >>>>> tab
>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>> >> >>>>> "functionality"
>>>> >> >>>>> except:
>>>> >> >>>>>
>>>> >> >>>>>
>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>> >> >>>>> KafkaUtils
>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>> >> >>>>> kafkaParams,
>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>> >> >>>>>
>>>> >> >>>>>        dstream.print();
>>>> >> >>>>>
>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>> >> >>>>> around
>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>> >> >>>>> /
>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>> >> >>>>> me,
>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>> >> >>>>> using
>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>> >> >>>>> amount of reads.
>>>> >> >>>>>
>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>> >> >>>>> expected
>>>> >> >>>>> way
>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>> >> >>>>> wrong?
>>>> >> >>>>>
>>>> >> >>>>> My application looks like
>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>> >> >>>>>
>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>> >> >>>>> <co...@koeninger.org>
>>>> >> >>>>> wrote:
>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>> >> >>>>>> just
>>>> >> >>>>>> deserialize)?
>>>> >> >>>>>>
>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>> >> >>>>>> The
>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>> >> >>>>>> stuck
>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>> >> >>>>>> search
>>>> >> >>>>>> the mailing list archives.
>>>> >> >>>>>>
>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>> >> >>>>>> <di...@uw.edu> wrote:
>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>> >> >>>>>>> with
>>>> >> >>>>>>> 1.7
>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>> >> >>>>>>> insert
>>>> >> >>>>>>> into
>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>> >> >>>>>>> 1.2.
>>>> >> >>>>>>>
>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>> >> >>>>>>> between
>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>> >> >>>>>>> consume
>>>> >> >>>>>>> the
>>>> >> >>>>>>> entries.
>>>> >> >>>>>>>
>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>> >> >>>>>>> bottleneck.
>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>> >> >>>>>>> didn't
>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>> >> >>>>>>> different
>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>> >> >>>>>>> performance.
>>>> >> >>>>>>>
>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>> >> >>>>>>> kafka
>>>> >> >>>>>>> using
>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>> >> >>>>>>>
>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>> >> >>>>>>> like to
>>>> >> >>>>>>> at least get 10%.
>>>> >> >>>>>>>
>>>> >> >>>>>>> My application looks like
>>>> >> >>>>>>>
>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>> >> >>>>>>>
>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>> >> >>>>>>> assistance.
>>>> >> >>>>>>>
>>>> >> >>>>>>>
>>>> >> >>>>>>>
>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>> >> >>>>>>>
>>>> >> >>>>
>>>> >> >>>>
>>>> >> >>>> ---------------------------------------------------------------------
>>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>>> >> >>>>
>>>> >>
>>>> >> ---------------------------------------------------------------------
>>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>>> >>
>>>> >
>>>
>>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
Hi Mich again,

Regarding batch window, etc. I have provided the sources, but I'm not
currently calling the window function. Did you see the program source?
It's only 100 lines.

https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877

Then I would expect I'm using defaults, other than what has been shown
in the configuration.

For example:

In the launcher configuration I set --conf
spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
are 500 messages for the duration set in the application:
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
Duration(1000));


Then with the --num-executors 6 \ submit flag, and the
spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
arrive at the 3000 events per batch in the UI, pasted above.

Feel free to correct me if I'm wrong.

Then are you suggesting that I set the window?

Maybe following this as reference:

https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html

On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
<mi...@gmail.com> wrote:
> Ok
>
> What is the set up for these please?
>
> batch window
> window length
> sliding interval
>
> And also in each batch window how much data do you get in (no of messages in
> the topic whatever)?
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:
>>
>> I believe you have an issue with performance?
>>
>> have you checked spark GUI (default 4040) for details including shuffles
>> etc?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>
>>> There are 25 nodes in the spark cluster.
>>>
>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>> <mi...@gmail.com> wrote:
>>> > how many nodes are in your cluster?
>>> >
>>> > --num-executors 6 \
>>> >  --driver-memory 4G \
>>> >  --executor-memory 2G \
>>> >  --total-executor-cores 12 \
>>> >
>>> >
>>> > Dr Mich Talebzadeh
>>> >
>>> >
>>> >
>>> > LinkedIn
>>> >
>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> >
>>> >
>>> >
>>> > http://talebzadehmich.wordpress.com
>>> >
>>> >
>>> >
>>> >
>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>>> > wrote:
>>> >>
>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>> >> as described in this thread. I was away from this project for awhile
>>> >> due to events in my family.
>>> >>
>>> >> Currently my scheduling delay is high, but the processing time is
>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>> >> some details below, including the script I use to launch the
>>> >> application. I'm using a Spark on Hbase library, whose version is
>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>> >> with my launch method that could be causing the delay, related to the
>>> >> included jars?
>>> >>
>>> >> Or is there something wrong with the very simple approach I'm taking
>>> >> for the application?
>>> >>
>>> >> Any advice is appriciated.
>>> >>
>>> >>
>>> >> The application:
>>> >>
>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >>
>>> >>
>>> >> From the streaming UI I get something like:
>>> >>
>>> >> table Completed Batches (last 1000 out of 27136)
>>> >>
>>> >>
>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>> >> Delay (?) Output Ops: Succeeded/Total
>>> >>
>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>> >>
>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>> >>
>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>> >>
>>> >>
>>> >> Here's how I'm launching the spark application.
>>> >>
>>> >>
>>> >> #!/usr/bin/env bash
>>> >>
>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>> >>
>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>> >>
>>> >> export
>>> >>
>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>> >>
>>> >>
>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>> >>
>>> >> --class com.example.KafkaToHbase \
>>> >>
>>> >> --master spark://spark_master:7077 \
>>> >>
>>> >> --deploy-mode client \
>>> >>
>>> >> --num-executors 6 \
>>> >>
>>> >> --driver-memory 4G \
>>> >>
>>> >> --executor-memory 2G \
>>> >>
>>> >> --total-executor-cores 12 \
>>> >>
>>> >> --jars
>>> >>
>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>> >> \
>>> >>
>>> >> --conf spark.app.name="Kafka To Hbase" \
>>> >>
>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>> >>
>>> >> --conf spark.eventLog.enabled=false \
>>> >>
>>> >> --conf spark.eventLog.overwrite=true \
>>> >>
>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>> >>
>>> >> --conf spark.streaming.backpressure.enabled=false \
>>> >>
>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>> >>
>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>> >>
>>> >> --driver-java-options
>>> >>
>>> >>
>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>> >> \
>>> >>
>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>> >> "broker1:9092,broker2:9092"
>>> >>
>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>> >> <di...@uw.edu>
>>> >> wrote:
>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>> >> > Then I'm in the process of using the direct api.
>>> >> >
>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>>> >> > wrote:
>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>> >> >> (as long as producers are distributing across partitions evenly).
>>> >> >>
>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>> >> >> <di...@uw.edu>
>>> >> >> wrote:
>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>> >> >>> issue with the receiver was the large number of partitions. I had
>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>> >> >>> my
>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>> >> >>> entries
>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>> >> >>>
>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>> >> >>> jars
>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>> >> >>> without
>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>> >> >>> improvement using the same code, but I'll see how the direct api
>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>> >> >>> Kafka
>>> >> >>> if it causes big performance issues.
>>> >> >>>
>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>> >> >>> <co...@koeninger.org>
>>> >> >>> wrote:
>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>> >> >>>> calls
>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>> >> >>>> receiver doesn't sound right in any case.
>>> >> >>>>
>>> >> >>>> Am I understanding correctly that you're trying to process a
>>> >> >>>> large
>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>> >> >>>> of
>>> >> >>>> topicpartitions, etc)?
>>> >> >>>>
>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>> >> >>>> streams, or
>>> >> >>>> even just kafkacat, as a baseline.
>>> >> >>>>
>>> >> >>>>
>>> >> >>>>
>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>> >> >>>> <di...@uw.edu> wrote:
>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>> >> >>>>> archives
>>> >> >>>>> but
>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>> >> >>>>> to
>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>> >> >>>>> Streaming
>>> >> >>>>> tab
>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>> >> >>>>> "functionality"
>>> >> >>>>> except:
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>> >> >>>>> KafkaUtils
>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>> >> >>>>> kafkaParams,
>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>> >> >>>>>
>>> >> >>>>>        dstream.print();
>>> >> >>>>>
>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>> >> >>>>> around
>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>> >> >>>>> /
>>> >> >>>>> second, just using the print output. This seems awfully high to
>>> >> >>>>> me,
>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>> >> >>>>> using
>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>> >> >>>>> amount of reads.
>>> >> >>>>>
>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>> >> >>>>> expected
>>> >> >>>>> way
>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>> >> >>>>> wrong?
>>> >> >>>>>
>>> >> >>>>> My application looks like
>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >> >>>>>
>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>> >> >>>>> <co...@koeninger.org>
>>> >> >>>>> wrote:
>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>> >> >>>>>> just
>>> >> >>>>>> deserialize)?
>>> >> >>>>>>
>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>> >> >>>>>> The
>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>> >> >>>>>> stuck
>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>> >> >>>>>> search
>>> >> >>>>>> the mailing list archives.
>>> >> >>>>>>
>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>> >> >>>>>> <di...@uw.edu> wrote:
>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>> >> >>>>>>> with
>>> >> >>>>>>> 1.7
>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>> >> >>>>>>> insert
>>> >> >>>>>>> into
>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>> >> >>>>>>> 1.2.
>>> >> >>>>>>>
>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>> >> >>>>>>> between
>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>> >> >>>>>>> consume
>>> >> >>>>>>> the
>>> >> >>>>>>> entries.
>>> >> >>>>>>>
>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>> >> >>>>>>> bottleneck.
>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>> >> >>>>>>> didn't
>>> >> >>>>>>> notice any large performance difference. I've tried many
>>> >> >>>>>>> different
>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>> >> >>>>>>> performance.
>>> >> >>>>>>>
>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>> >> >>>>>>> kafka
>>> >> >>>>>>> using
>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>> >> >>>>>>>
>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>> >> >>>>>>> like to
>>> >> >>>>>>> at least get 10%.
>>> >> >>>>>>>
>>> >> >>>>>>> My application looks like
>>> >> >>>>>>>
>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>> >> >>>>>>>
>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>> >> >>>>>>> assistance.
>>> >> >>>>>>>
>>> >> >>>>>>>
>>> >> >>>>>>>
>>> >> >>>>>>> ---------------------------------------------------------------------
>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>> >> >>>>>>>
>>> >> >>>>
>>> >> >>>>
>>> >> >>>> ---------------------------------------------------------------------
>>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>>> >> >>>>
>>> >>
>>> >> ---------------------------------------------------------------------
>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>> >>
>>> >
>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Mich Talebzadeh <mi...@gmail.com>.
Ok

What is the set up for these please?

batch window
window length
sliding interval

And also in each batch window how much data do you get in (no of messages
in the topic whatever)?




Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 18 June 2016 at 21:01, Mich Talebzadeh <mi...@gmail.com> wrote:

> I believe you have an issue with performance?
>
> have you checked spark GUI (default 4040) for details including shuffles
> etc?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:
>
>> There are 25 nodes in the spark cluster.
>>
>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>> <mi...@gmail.com> wrote:
>> > how many nodes are in your cluster?
>> >
>> > --num-executors 6 \
>> >  --driver-memory 4G \
>> >  --executor-memory 2G \
>> >  --total-executor-cores 12 \
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> >
>> >
>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu>
>> wrote:
>> >>
>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>> >> Kafka using the direct api and inserts content into an hbase cluster,
>> >> as described in this thread. I was away from this project for awhile
>> >> due to events in my family.
>> >>
>> >> Currently my scheduling delay is high, but the processing time is
>> >> stable around a second. I changed my setup to use 6 kafka partitions
>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>> >> some details below, including the script I use to launch the
>> >> application. I'm using a Spark on Hbase library, whose version is
>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>> >> with my launch method that could be causing the delay, related to the
>> >> included jars?
>> >>
>> >> Or is there something wrong with the very simple approach I'm taking
>> >> for the application?
>> >>
>> >> Any advice is appriciated.
>> >>
>> >>
>> >> The application:
>> >>
>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >>
>> >>
>> >> From the streaming UI I get something like:
>> >>
>> >> table Completed Batches (last 1000 out of 27136)
>> >>
>> >>
>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>> >> Delay (?) Output Ops: Succeeded/Total
>> >>
>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>> >>
>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>> >>
>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>> >>
>> >>
>> >> Here's how I'm launching the spark application.
>> >>
>> >>
>> >> #!/usr/bin/env bash
>> >>
>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>> >>
>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>> >>
>> >> export
>> >>
>> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>> >>
>> >>
>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>> >>
>> >> --class com.example.KafkaToHbase \
>> >>
>> >> --master spark://spark_master:7077 \
>> >>
>> >> --deploy-mode client \
>> >>
>> >> --num-executors 6 \
>> >>
>> >> --driver-memory 4G \
>> >>
>> >> --executor-memory 2G \
>> >>
>> >> --total-executor-cores 12 \
>> >>
>> >> --jars
>> >>
>> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>> >> \
>> >>
>> >> --conf spark.app.name="Kafka To Hbase" \
>> >>
>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>> >>
>> >> --conf spark.eventLog.enabled=false \
>> >>
>> >> --conf spark.eventLog.overwrite=true \
>> >>
>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>> >>
>> >> --conf spark.streaming.backpressure.enabled=false \
>> >>
>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>> >>
>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>> >>
>> >> --driver-java-options
>> >>
>> >>
>> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>> >> \
>> >>
>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>> >> "broker1:9092,broker2:9092"
>> >>
>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <discord@uw.edu
>> >
>> >> wrote:
>> >> > Thanks Cody, I can see that the partitions are well distributed...
>> >> > Then I'm in the process of using the direct api.
>> >> >
>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>> >> > wrote:
>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>> >> >> (as long as producers are distributing across partitions evenly).
>> >> >>
>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <
>> discord@uw.edu>
>> >> >> wrote:
>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>> >> >>> issue with the receiver was the large number of partitions. I had
>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition my
>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>> entries
>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>> >> >>>
>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>> jars
>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>> without
>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>> >> >>> improvement using the same code, but I'll see how the direct api
>> >> >>> handles it. In the end I can reduce the number of partitions in
>> Kafka
>> >> >>> if it causes big performance issues.
>> >> >>>
>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <cody@koeninger.org
>> >
>> >> >>> wrote:
>> >> >>>> print() isn't really the best way to benchmark things, since it
>> calls
>> >> >>>> take(10) under the covers, but 380 records / second for a single
>> >> >>>> receiver doesn't sound right in any case.
>> >> >>>>
>> >> >>>> Am I understanding correctly that you're trying to process a large
>> >> >>>> number of already-existing kafka messages, not keep up with an
>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>> of
>> >> >>>> topicpartitions, etc)?
>> >> >>>>
>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>> streams, or
>> >> >>>> even just kafkacat, as a baseline.
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>> >> >>>> <di...@uw.edu> wrote:
>> >> >>>>> Hello again. I searched for "backport kafka" in the list archives
>> >> >>>>> but
>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>> to
>> >> >>>>> use accumulators to make a counter, but then saw on the Streaming
>> >> >>>>> tab
>> >> >>>>> the Receiver Statistics. Then I removed all other "functionality"
>> >> >>>>> except:
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>> >> >>>>> KafkaUtils
>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>> >> >>>>> kafkaParams,
>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>> >> >>>>>
>> >> >>>>>        dstream.print();
>> >> >>>>>
>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>> >> >>>>> around
>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>> /
>> >> >>>>> second, just using the print output. This seems awfully high to
>> me,
>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>> using
>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>> >> >>>>> amount of reads.
>> >> >>>>>
>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>> expected
>> >> >>>>> way
>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>> >> >>>>> wrong?
>> >> >>>>>
>> >> >>>>> My application looks like
>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >> >>>>>
>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <
>> cody@koeninger.org>
>> >> >>>>> wrote:
>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>> just
>> >> >>>>>> deserialize)?
>> >> >>>>>>
>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>> The
>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>> >> >>>>>> stuck
>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>> >> >>>>>> search
>> >> >>>>>> the mailing list archives.
>> >> >>>>>>
>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>> >> >>>>>> <di...@uw.edu> wrote:
>> >> >>>>>>> I've written an application to get content from a kafka topic
>> with
>> >> >>>>>>> 1.7
>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>> insert
>> >> >>>>>>> into
>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>> 1.2.
>> >> >>>>>>>
>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>> between
>> >> >>>>>>> 0-2500 writes / second. This will take much too long to consume
>> >> >>>>>>> the
>> >> >>>>>>> entries.
>> >> >>>>>>>
>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>> >> >>>>>>> bottleneck.
>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>> >> >>>>>>> didn't
>> >> >>>>>>> notice any large performance difference. I've tried many
>> different
>> >> >>>>>>> spark configuration options, but can't seem to get better
>> >> >>>>>>> performance.
>> >> >>>>>>>
>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>> kafka
>> >> >>>>>>> using
>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>> >> >>>>>>>
>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>> >> >>>>>>> like to
>> >> >>>>>>> at least get 10%.
>> >> >>>>>>>
>> >> >>>>>>> My application looks like
>> >> >>>>>>>
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >> >>>>>>>
>> >> >>>>>>> This is my first spark application. I'd appreciate any
>> assistance.
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>>>
>> ---------------------------------------------------------------------
>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>> >> >>>>>>>
>> >> >>>>
>> >> >>>>
>> ---------------------------------------------------------------------
>> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
>> >> >>>>
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> For additional commands, e-mail: user-help@spark.apache.org
>> >>
>> >
>>
>
>

Re: Improving performance of a kafka spark streaming app

Posted by Mich Talebzadeh <mi...@gmail.com>.
I believe you have an issue with performance?

have you checked spark GUI (default 4040) for details including shuffles
etc?

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 18 June 2016 at 20:59, Colin Kincaid Williams <di...@uw.edu> wrote:

> There are 25 nodes in the spark cluster.
>
> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
> <mi...@gmail.com> wrote:
> > how many nodes are in your cluster?
> >
> > --num-executors 6 \
> >  --driver-memory 4G \
> >  --executor-memory 2G \
> >  --total-executor-cores 12 \
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu> wrote:
> >>
> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
> >> Kafka using the direct api and inserts content into an hbase cluster,
> >> as described in this thread. I was away from this project for awhile
> >> due to events in my family.
> >>
> >> Currently my scheduling delay is high, but the processing time is
> >> stable around a second. I changed my setup to use 6 kafka partitions
> >> on a set of smaller kafka brokers, with fewer disks. I've included
> >> some details below, including the script I use to launch the
> >> application. I'm using a Spark on Hbase library, whose version is
> >> relevant to my Hbase cluster. Is it apparent there is something wrong
> >> with my launch method that could be causing the delay, related to the
> >> included jars?
> >>
> >> Or is there something wrong with the very simple approach I'm taking
> >> for the application?
> >>
> >> Any advice is appriciated.
> >>
> >>
> >> The application:
> >>
> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
> >>
> >>
> >> From the streaming UI I get something like:
> >>
> >> table Completed Batches (last 1000 out of 27136)
> >>
> >>
> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
> >> Delay (?) Output Ops: Succeeded/Total
> >>
> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
> >>
> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
> >>
> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
> >>
> >>
> >> Here's how I'm launching the spark application.
> >>
> >>
> >> #!/usr/bin/env bash
> >>
> >> export SPARK_CONF_DIR=/home/colin.williams/spark
> >>
> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
> >>
> >> export
> >>
> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
> >>
> >>
> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
> >>
> >> --class com.example.KafkaToHbase \
> >>
> >> --master spark://spark_master:7077 \
> >>
> >> --deploy-mode client \
> >>
> >> --num-executors 6 \
> >>
> >> --driver-memory 4G \
> >>
> >> --executor-memory 2G \
> >>
> >> --total-executor-cores 12 \
> >>
> >> --jars
> >>
> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
> >> \
> >>
> >> --conf spark.app.name="Kafka To Hbase" \
> >>
> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
> >>
> >> --conf spark.eventLog.enabled=false \
> >>
> >> --conf spark.eventLog.overwrite=true \
> >>
> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> >>
> >> --conf spark.streaming.backpressure.enabled=false \
> >>
> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
> >>
> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
> >>
> >> --driver-java-options
> >>
> >>
> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
> >> \
> >>
> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
> >> "broker1:9092,broker2:9092"
> >>
> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <di...@uw.edu>
> >> wrote:
> >> > Thanks Cody, I can see that the partitions are well distributed...
> >> > Then I'm in the process of using the direct api.
> >> >
> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
> >> > wrote:
> >> >> 60 partitions in and of itself shouldn't be a big performance issue
> >> >> (as long as producers are distributing across partitions evenly).
> >> >>
> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <
> discord@uw.edu>
> >> >> wrote:
> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
> >> >>> issue with the receiver was the large number of partitions. I had
> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition my
> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
> entries
> >> >>> into Kafka on a map reduce job in 5 and a half hours.
> >> >>>
> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
> >> >>> yesterday, I tried building against 1.5.2. So far it's running
> without
> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
> >> >>> improvement using the same code, but I'll see how the direct api
> >> >>> handles it. In the end I can reduce the number of partitions in
> Kafka
> >> >>> if it causes big performance issues.
> >> >>>
> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <co...@koeninger.org>
> >> >>> wrote:
> >> >>>> print() isn't really the best way to benchmark things, since it
> calls
> >> >>>> take(10) under the covers, but 380 records / second for a single
> >> >>>> receiver doesn't sound right in any case.
> >> >>>>
> >> >>>> Am I understanding correctly that you're trying to process a large
> >> >>>> number of already-existing kafka messages, not keep up with an
> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
> of
> >> >>>> topicpartitions, etc)?
> >> >>>>
> >> >>>> Really though, I'd try to start with spark 1.6 and direct streams,
> or
> >> >>>> even just kafkacat, as a baseline.
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
> >> >>>> <di...@uw.edu> wrote:
> >> >>>>> Hello again. I searched for "backport kafka" in the list archives
> >> >>>>> but
> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
> to
> >> >>>>> use accumulators to make a counter, but then saw on the Streaming
> >> >>>>> tab
> >> >>>>> the Receiver Statistics. Then I removed all other "functionality"
> >> >>>>> except:
> >> >>>>>
> >> >>>>>
> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
> >> >>>>> KafkaUtils
> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
> >> >>>>> kafkaParams,
> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
> >> >>>>> kafka.serializer.DefaultDecoder.class,
> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
> >> >>>>>
> >> >>>>>        dstream.print();
> >> >>>>>
> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
> >> >>>>> around
> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records /
> >> >>>>> second, just using the print output. This seems awfully high to
> me,
> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
> using
> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
> >> >>>>> amount of reads.
> >> >>>>>
> >> >>>>> Even given the issues with the 1.2 receivers, is this the expected
> >> >>>>> way
> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
> >> >>>>> wrong?
> >> >>>>>
> >> >>>>> My application looks like
> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
> >> >>>>>
> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <
> cody@koeninger.org>
> >> >>>>> wrote:
> >> >>>>>> Have you tested for read throughput (without writing to hbase,
> just
> >> >>>>>> deserialize)?
> >> >>>>>>
> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
> The
> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
> >> >>>>>> stuck
> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
> >> >>>>>> search
> >> >>>>>> the mailing list archives.
> >> >>>>>>
> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
> >> >>>>>> <di...@uw.edu> wrote:
> >> >>>>>>> I've written an application to get content from a kafka topic
> with
> >> >>>>>>> 1.7
> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
> insert
> >> >>>>>>> into
> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
> 1.2.
> >> >>>>>>>
> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
> between
> >> >>>>>>> 0-2500 writes / second. This will take much too long to consume
> >> >>>>>>> the
> >> >>>>>>> entries.
> >> >>>>>>>
> >> >>>>>>> I currently believe that the spark kafka receiver is the
> >> >>>>>>> bottleneck.
> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
> >> >>>>>>> didn't
> >> >>>>>>> notice any large performance difference. I've tried many
> different
> >> >>>>>>> spark configuration options, but can't seem to get better
> >> >>>>>>> performance.
> >> >>>>>>>
> >> >>>>>>> I saw 80000 requests / second inserting these records into kafka
> >> >>>>>>> using
> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
> >> >>>>>>>
> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
> >> >>>>>>> like to
> >> >>>>>>> at least get 10%.
> >> >>>>>>>
> >> >>>>>>> My application looks like
> >> >>>>>>>
> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
> >> >>>>>>>
> >> >>>>>>> This is my first spark application. I'd appreciate any
> assistance.
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>>
> ---------------------------------------------------------------------
> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
> >> >>>>>>>
> >> >>>>
> >> >>>>
> ---------------------------------------------------------------------
> >> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> >>>> For additional commands, e-mail: user-help@spark.apache.org
> >> >>>>
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> For additional commands, e-mail: user-help@spark.apache.org
> >>
> >
>

Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
There are 25 nodes in the spark cluster.

On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
<mi...@gmail.com> wrote:
> how many nodes are in your cluster?
>
> --num-executors 6 \
>  --driver-memory 4G \
>  --executor-memory 2G \
>  --total-executor-cores 12 \
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu> wrote:
>>
>> I updated my app to Spark 1.5.2 streaming so that it consumes from
>> Kafka using the direct api and inserts content into an hbase cluster,
>> as described in this thread. I was away from this project for awhile
>> due to events in my family.
>>
>> Currently my scheduling delay is high, but the processing time is
>> stable around a second. I changed my setup to use 6 kafka partitions
>> on a set of smaller kafka brokers, with fewer disks. I've included
>> some details below, including the script I use to launch the
>> application. I'm using a Spark on Hbase library, whose version is
>> relevant to my Hbase cluster. Is it apparent there is something wrong
>> with my launch method that could be causing the delay, related to the
>> included jars?
>>
>> Or is there something wrong with the very simple approach I'm taking
>> for the application?
>>
>> Any advice is appriciated.
>>
>>
>> The application:
>>
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>>
>> From the streaming UI I get something like:
>>
>> table Completed Batches (last 1000 out of 27136)
>>
>>
>> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>> Delay (?) Output Ops: Succeeded/Total
>>
>> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>
>> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>
>> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>
>>
>> Here's how I'm launching the spark application.
>>
>>
>> #!/usr/bin/env bash
>>
>> export SPARK_CONF_DIR=/home/colin.williams/spark
>>
>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>
>> export
>> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>
>>
>> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>
>> --class com.example.KafkaToHbase \
>>
>> --master spark://spark_master:7077 \
>>
>> --deploy-mode client \
>>
>> --num-executors 6 \
>>
>> --driver-memory 4G \
>>
>> --executor-memory 2G \
>>
>> --total-executor-cores 12 \
>>
>> --jars
>> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>> \
>>
>> --conf spark.app.name="Kafka To Hbase" \
>>
>> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>
>> --conf spark.eventLog.enabled=false \
>>
>> --conf spark.eventLog.overwrite=true \
>>
>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>
>> --conf spark.streaming.backpressure.enabled=false \
>>
>> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>
>> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>
>> --driver-java-options
>>
>> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>> \
>>
>> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>> "broker1:9092,broker2:9092"
>>
>> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <di...@uw.edu>
>> wrote:
>> > Thanks Cody, I can see that the partitions are well distributed...
>> > Then I'm in the process of using the direct api.
>> >
>> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
>> > wrote:
>> >> 60 partitions in and of itself shouldn't be a big performance issue
>> >> (as long as producers are distributing across partitions evenly).
>> >>
>> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <di...@uw.edu>
>> >> wrote:
>> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>> >>> issue with the receiver was the large number of partitions. I had
>> >>> miscounted the disks and so 11*3*2 is how I decided to partition my
>> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>> >>> attempt ) . This worked well enough for me, I put 1.7 billion entries
>> >>> into Kafka on a map reduce job in 5 and a half hours.
>> >>>
>> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
>> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>> >>> yesterday, I tried building against 1.5.2. So far it's running without
>> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>> >>> improvement using the same code, but I'll see how the direct api
>> >>> handles it. In the end I can reduce the number of partitions in Kafka
>> >>> if it causes big performance issues.
>> >>>
>> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <co...@koeninger.org>
>> >>> wrote:
>> >>>> print() isn't really the best way to benchmark things, since it calls
>> >>>> take(10) under the covers, but 380 records / second for a single
>> >>>> receiver doesn't sound right in any case.
>> >>>>
>> >>>> Am I understanding correctly that you're trying to process a large
>> >>>> number of already-existing kafka messages, not keep up with an
>> >>>> incoming stream?  Can you give any details (e.g. hardware, number of
>> >>>> topicpartitions, etc)?
>> >>>>
>> >>>> Really though, I'd try to start with spark 1.6 and direct streams, or
>> >>>> even just kafkacat, as a baseline.
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>> >>>> <di...@uw.edu> wrote:
>> >>>>> Hello again. I searched for "backport kafka" in the list archives
>> >>>>> but
>> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going to
>> >>>>> use accumulators to make a counter, but then saw on the Streaming
>> >>>>> tab
>> >>>>> the Receiver Statistics. Then I removed all other "functionality"
>> >>>>> except:
>> >>>>>
>> >>>>>
>> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>> >>>>> KafkaUtils
>> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>> >>>>> kafkaParams,
>> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>> >>>>> kafka.serializer.DefaultDecoder.class,
>> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>> >>>>>
>> >>>>>        dstream.print();
>> >>>>>
>> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>> >>>>> around
>> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>> >>>>> above, I'd need to run around 21 receivers, assuming 380 records /
>> >>>>> second, just using the print output. This seems awfully high to me,
>> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again using
>> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>> >>>>> amount of reads.
>> >>>>>
>> >>>>> Even given the issues with the 1.2 receivers, is this the expected
>> >>>>> way
>> >>>>> to use the Kafka streaming API, or am I doing something terribly
>> >>>>> wrong?
>> >>>>>
>> >>>>> My application looks like
>> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >>>>>
>> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org>
>> >>>>> wrote:
>> >>>>>> Have you tested for read throughput (without writing to hbase, just
>> >>>>>> deserialize)?
>> >>>>>>
>> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?  The
>> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>> >>>>>> stuck
>> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>> >>>>>> search
>> >>>>>> the mailing list archives.
>> >>>>>>
>> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>> >>>>>> <di...@uw.edu> wrote:
>> >>>>>>> I've written an application to get content from a kafka topic with
>> >>>>>>> 1.7
>> >>>>>>> billion entries,  get the protobuf serialized entries, and insert
>> >>>>>>> into
>> >>>>>>> hbase. Currently the environment that I'm running in is Spark 1.2.
>> >>>>>>>
>> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>> >>>>>>> 0-2500 writes / second. This will take much too long to consume
>> >>>>>>> the
>> >>>>>>> entries.
>> >>>>>>>
>> >>>>>>> I currently believe that the spark kafka receiver is the
>> >>>>>>> bottleneck.
>> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>> >>>>>>> didn't
>> >>>>>>> notice any large performance difference. I've tried many different
>> >>>>>>> spark configuration options, but can't seem to get better
>> >>>>>>> performance.
>> >>>>>>>
>> >>>>>>> I saw 80000 requests / second inserting these records into kafka
>> >>>>>>> using
>> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>> >>>>>>>
>> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>> >>>>>>> like to
>> >>>>>>> at least get 10%.
>> >>>>>>>
>> >>>>>>> My application looks like
>> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>> >>>>>>>
>> >>>>>>> This is my first spark application. I'd appreciate any assistance.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> ---------------------------------------------------------------------
>> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>>>>>
>> >>>>
>> >>>> ---------------------------------------------------------------------
>> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Mich Talebzadeh <mi...@gmail.com>.
how many nodes are in your cluster?

--num-executors 6 \
 --driver-memory 4G \
 --executor-memory 2G \
 --total-executor-cores 12 \


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 18 June 2016 at 20:40, Colin Kincaid Williams <di...@uw.edu> wrote:

> I updated my app to Spark 1.5.2 streaming so that it consumes from
> Kafka using the direct api and inserts content into an hbase cluster,
> as described in this thread. I was away from this project for awhile
> due to events in my family.
>
> Currently my scheduling delay is high, but the processing time is
> stable around a second. I changed my setup to use 6 kafka partitions
> on a set of smaller kafka brokers, with fewer disks. I've included
> some details below, including the script I use to launch the
> application. I'm using a Spark on Hbase library, whose version is
> relevant to my Hbase cluster. Is it apparent there is something wrong
> with my launch method that could be causing the delay, related to the
> included jars?
>
> Or is there something wrong with the very simple approach I'm taking
> for the application?
>
> Any advice is appriciated.
>
>
> The application:
>
> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>
>
> From the streaming UI I get something like:
>
> table Completed Batches (last 1000 out of 27136)
>
>
> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
> Delay (?) Output Ops: Succeeded/Total
>
> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>
> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>
> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>
>
> Here's how I'm launching the spark application.
>
>
> #!/usr/bin/env bash
>
> export SPARK_CONF_DIR=/home/colin.williams/spark
>
> export HADOOP_CONF_DIR=/etc/hadoop/conf
>
> export
> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>
>
> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>
> --class com.example.KafkaToHbase \
>
> --master spark://spark_master:7077 \
>
> --deploy-mode client \
>
> --num-executors 6 \
>
> --driver-memory 4G \
>
> --executor-memory 2G \
>
> --total-executor-cores 12 \
>
> --jars
> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
> \
>
> --conf spark.app.name="Kafka To Hbase" \
>
> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>
> --conf spark.eventLog.enabled=false \
>
> --conf spark.eventLog.overwrite=true \
>
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>
> --conf spark.streaming.backpressure.enabled=false \
>
> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>
> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>
> --driver-java-options
>
> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
> \
>
> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
> "broker1:9092,broker2:9092"
>
> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <di...@uw.edu>
> wrote:
> > Thanks Cody, I can see that the partitions are well distributed...
> > Then I'm in the process of using the direct api.
> >
> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >> 60 partitions in and of itself shouldn't be a big performance issue
> >> (as long as producers are distributing across partitions evenly).
> >>
> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <di...@uw.edu>
> wrote:
> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
> >>> issue with the receiver was the large number of partitions. I had
> >>> miscounted the disks and so 11*3*2 is how I decided to partition my
> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
> >>> attempt ) . This worked well enough for me, I put 1.7 billion entries
> >>> into Kafka on a map reduce job in 5 and a half hours.
> >>>
> >>> I was concerned using spark 1.5.2 because I'm currently putting my
> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
> >>> yesterday, I tried building against 1.5.2. So far it's running without
> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
> >>> improvement using the same code, but I'll see how the direct api
> >>> handles it. In the end I can reduce the number of partitions in Kafka
> >>> if it causes big performance issues.
> >>>
> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>>> print() isn't really the best way to benchmark things, since it calls
> >>>> take(10) under the covers, but 380 records / second for a single
> >>>> receiver doesn't sound right in any case.
> >>>>
> >>>> Am I understanding correctly that you're trying to process a large
> >>>> number of already-existing kafka messages, not keep up with an
> >>>> incoming stream?  Can you give any details (e.g. hardware, number of
> >>>> topicpartitions, etc)?
> >>>>
> >>>> Really though, I'd try to start with spark 1.6 and direct streams, or
> >>>> even just kafkacat, as a baseline.
> >>>>
> >>>>
> >>>>
> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams <
> discord@uw.edu> wrote:
> >>>>> Hello again. I searched for "backport kafka" in the list archives but
> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going to
> >>>>> use accumulators to make a counter, but then saw on the Streaming tab
> >>>>> the Receiver Statistics. Then I removed all other "functionality"
> >>>>> except:
> >>>>>
> >>>>>
> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream = KafkaUtils
> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams,
> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
> >>>>>       .createStream(jssc, byte[].class, byte[].class,
> >>>>> kafka.serializer.DefaultDecoder.class,
> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
> >>>>>
> >>>>>        dstream.print();
> >>>>>
> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
> around
> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
> >>>>> above, I'd need to run around 21 receivers, assuming 380 records /
> >>>>> second, just using the print output. This seems awfully high to me,
> >>>>> considering that I wrote 80000+ records a second to Kafka from a
> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again using
> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
> >>>>> amount of reads.
> >>>>>
> >>>>> Even given the issues with the 1.2 receivers, is this the expected
> way
> >>>>> to use the Kafka streaming API, or am I doing something terribly
> >>>>> wrong?
> >>>>>
> >>>>> My application looks like
> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
> >>>>>
> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>>>>> Have you tested for read throughput (without writing to hbase, just
> >>>>>> deserialize)?
> >>>>>>
> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?  The
> >>>>>> kafka direct stream is available starting with 1.3.  If you're stuck
> >>>>>> on 1.2, I believe there have been some attempts to backport it,
> search
> >>>>>> the mailing list archives.
> >>>>>>
> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <
> discord@uw.edu> wrote:
> >>>>>>> I've written an application to get content from a kafka topic with
> 1.7
> >>>>>>> billion entries,  get the protobuf serialized entries, and insert
> into
> >>>>>>> hbase. Currently the environment that I'm running in is Spark 1.2.
> >>>>>>>
> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
> >>>>>>> 0-2500 writes / second. This will take much too long to consume the
> >>>>>>> entries.
> >>>>>>>
> >>>>>>> I currently believe that the spark kafka receiver is the
> bottleneck.
> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and didn't
> >>>>>>> notice any large performance difference. I've tried many different
> >>>>>>> spark configuration options, but can't seem to get better
> performance.
> >>>>>>>
> >>>>>>> I saw 80000 requests / second inserting these records into kafka
> using
> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
> >>>>>>>
> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
> like to
> >>>>>>> at least get 10%.
> >>>>>>>
> >>>>>>> My application looks like
> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
> >>>>>>>
> >>>>>>> This is my first spark application. I'd appreciate any assistance.
> >>>>>>>
> >>>>>>>
> ---------------------------------------------------------------------
> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
> >>>>>>>
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>>> For additional commands, e-mail: user-help@spark.apache.org
> >>>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
I updated my app to Spark 1.5.2 streaming so that it consumes from
Kafka using the direct api and inserts content into an hbase cluster,
as described in this thread. I was away from this project for awhile
due to events in my family.

Currently my scheduling delay is high, but the processing time is
stable around a second. I changed my setup to use 6 kafka partitions
on a set of smaller kafka brokers, with fewer disks. I've included
some details below, including the script I use to launch the
application. I'm using a Spark on Hbase library, whose version is
relevant to my Hbase cluster. Is it apparent there is something wrong
with my launch method that could be causing the delay, related to the
included jars?

Or is there something wrong with the very simple approach I'm taking
for the application?

Any advice is appriciated.


The application:

https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877


From the streaming UI I get something like:

table Completed Batches (last 1000 out of 27136)


Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
Delay (?) Output Ops: Succeeded/Total

2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1

2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1

2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1


Here's how I'm launching the spark application.


#!/usr/bin/env bash

export SPARK_CONF_DIR=/home/colin.williams/spark

export HADOOP_CONF_DIR=/etc/hadoop/conf

export HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar


/opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \

--class com.example.KafkaToHbase \

--master spark://spark_master:7077 \

--deploy-mode client \

--num-executors 6 \

--driver-memory 4G \

--executor-memory 2G \

--total-executor-cores 12 \

--jars /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
\

--conf spark.app.name="Kafka To Hbase" \

--conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \

--conf spark.eventLog.enabled=false \

--conf spark.eventLog.overwrite=true \

--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \

--conf spark.streaming.backpressure.enabled=false \

--conf spark.streaming.kafka.maxRatePerPartition=500 \

--driver-class-path /home/colin.williams/kafka-hbase.jar \

--driver-java-options
-Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
\

/home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
"broker1:9092,broker2:9092"

On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
> Thanks Cody, I can see that the partitions are well distributed...
> Then I'm in the process of using the direct api.
>
> On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org> wrote:
>> 60 partitions in and of itself shouldn't be a big performance issue
>> (as long as producers are distributing across partitions evenly).
>>
>> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>> issue with the receiver was the large number of partitions. I had
>>> miscounted the disks and so 11*3*2 is how I decided to partition my
>>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>> attempt ) . This worked well enough for me, I put 1.7 billion entries
>>> into Kafka on a map reduce job in 5 and a half hours.
>>>
>>> I was concerned using spark 1.5.2 because I'm currently putting my
>>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
>>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>> yesterday, I tried building against 1.5.2. So far it's running without
>>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>> improvement using the same code, but I'll see how the direct api
>>> handles it. In the end I can reduce the number of partitions in Kafka
>>> if it causes big performance issues.
>>>
>>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <co...@koeninger.org> wrote:
>>>> print() isn't really the best way to benchmark things, since it calls
>>>> take(10) under the covers, but 380 records / second for a single
>>>> receiver doesn't sound right in any case.
>>>>
>>>> Am I understanding correctly that you're trying to process a large
>>>> number of already-existing kafka messages, not keep up with an
>>>> incoming stream?  Can you give any details (e.g. hardware, number of
>>>> topicpartitions, etc)?
>>>>
>>>> Really though, I'd try to start with spark 1.6 and direct streams, or
>>>> even just kafkacat, as a baseline.
>>>>
>>>>
>>>>
>>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>> Hello again. I searched for "backport kafka" in the list archives but
>>>>> couldn't find anything but a post from Spark 0.7.2 . I was going to
>>>>> use accumulators to make a counter, but then saw on the Streaming tab
>>>>> the Receiver Statistics. Then I removed all other "functionality"
>>>>> except:
>>>>>
>>>>>
>>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream = KafkaUtils
>>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>> Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams,
>>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>> kafka.serializer.DefaultDecoder.class,
>>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>
>>>>>        dstream.print();
>>>>>
>>>>> Then in the Recieiver Stats for the single receiver, I'm seeing around
>>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>> above, I'd need to run around 21 receivers, assuming 380 records /
>>>>> second, just using the print output. This seems awfully high to me,
>>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>> mapreduce job, and that my bottleneck was likely Hbase. Again using
>>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>> amount of reads.
>>>>>
>>>>> Even given the issues with the 1.2 receivers, is this the expected way
>>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>> wrong?
>>>>>
>>>>> My application looks like
>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>
>>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>>>> Have you tested for read throughput (without writing to hbase, just
>>>>>> deserialize)?
>>>>>>
>>>>>> Are you limited to using spark 1.2, or is upgrading possible?  The
>>>>>> kafka direct stream is available starting with 1.3.  If you're stuck
>>>>>> on 1.2, I believe there have been some attempts to backport it, search
>>>>>> the mailing list archives.
>>>>>>
>>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>>> I've written an application to get content from a kafka topic with 1.7
>>>>>>> billion entries,  get the protobuf serialized entries, and insert into
>>>>>>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>>>>>>
>>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>>>>>>> 0-2500 writes / second. This will take much too long to consume the
>>>>>>> entries.
>>>>>>>
>>>>>>> I currently believe that the spark kafka receiver is the bottleneck.
>>>>>>> I've tried both 1.2 receivers, with the WAL and without, and didn't
>>>>>>> notice any large performance difference. I've tried many different
>>>>>>> spark configuration options, but can't seem to get better performance.
>>>>>>>
>>>>>>> I saw 80000 requests / second inserting these records into kafka using
>>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>>>
>>>>>>> While hbase inserts might not deliver the same throughput, I'd like to
>>>>>>> at least get 10%.
>>>>>>>
>>>>>>> My application looks like
>>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>
>>>>>>> This is my first spark application. I'd appreciate any assistance.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
Thanks Cody, I can see that the partitions are well distributed...
Then I'm in the process of using the direct api.

On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <co...@koeninger.org> wrote:
> 60 partitions in and of itself shouldn't be a big performance issue
> (as long as producers are distributing across partitions evenly).
>
> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>> issue with the receiver was the large number of partitions. I had
>> miscounted the disks and so 11*3*2 is how I decided to partition my
>> topic on insertion, ( by my own, unjustified reasoning, on a first
>> attempt ) . This worked well enough for me, I put 1.7 billion entries
>> into Kafka on a map reduce job in 5 and a half hours.
>>
>> I was concerned using spark 1.5.2 because I'm currently putting my
>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>> yesterday, I tried building against 1.5.2. So far it's running without
>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>> improvement using the same code, but I'll see how the direct api
>> handles it. In the end I can reduce the number of partitions in Kafka
>> if it causes big performance issues.
>>
>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <co...@koeninger.org> wrote:
>>> print() isn't really the best way to benchmark things, since it calls
>>> take(10) under the covers, but 380 records / second for a single
>>> receiver doesn't sound right in any case.
>>>
>>> Am I understanding correctly that you're trying to process a large
>>> number of already-existing kafka messages, not keep up with an
>>> incoming stream?  Can you give any details (e.g. hardware, number of
>>> topicpartitions, etc)?
>>>
>>> Really though, I'd try to start with spark 1.6 and direct streams, or
>>> even just kafkacat, as a baseline.
>>>
>>>
>>>
>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>> Hello again. I searched for "backport kafka" in the list archives but
>>>> couldn't find anything but a post from Spark 0.7.2 . I was going to
>>>> use accumulators to make a counter, but then saw on the Streaming tab
>>>> the Receiver Statistics. Then I removed all other "functionality"
>>>> except:
>>>>
>>>>
>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream = KafkaUtils
>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>> Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams,
>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>> kafka.serializer.DefaultDecoder.class,
>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>
>>>>        dstream.print();
>>>>
>>>> Then in the Recieiver Stats for the single receiver, I'm seeing around
>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>> above, I'd need to run around 21 receivers, assuming 380 records /
>>>> second, just using the print output. This seems awfully high to me,
>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>> mapreduce job, and that my bottleneck was likely Hbase. Again using
>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>> amount of reads.
>>>>
>>>> Even given the issues with the 1.2 receivers, is this the expected way
>>>> to use the Kafka streaming API, or am I doing something terribly
>>>> wrong?
>>>>
>>>> My application looks like
>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>
>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>>> Have you tested for read throughput (without writing to hbase, just
>>>>> deserialize)?
>>>>>
>>>>> Are you limited to using spark 1.2, or is upgrading possible?  The
>>>>> kafka direct stream is available starting with 1.3.  If you're stuck
>>>>> on 1.2, I believe there have been some attempts to backport it, search
>>>>> the mailing list archives.
>>>>>
>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>>> I've written an application to get content from a kafka topic with 1.7
>>>>>> billion entries,  get the protobuf serialized entries, and insert into
>>>>>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>>>>>
>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>>>>>> 0-2500 writes / second. This will take much too long to consume the
>>>>>> entries.
>>>>>>
>>>>>> I currently believe that the spark kafka receiver is the bottleneck.
>>>>>> I've tried both 1.2 receivers, with the WAL and without, and didn't
>>>>>> notice any large performance difference. I've tried many different
>>>>>> spark configuration options, but can't seem to get better performance.
>>>>>>
>>>>>> I saw 80000 requests / second inserting these records into kafka using
>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>>
>>>>>> While hbase inserts might not deliver the same throughput, I'd like to
>>>>>> at least get 10%.
>>>>>>
>>>>>> My application looks like
>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>
>>>>>> This is my first spark application. I'd appreciate any assistance.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Cody Koeninger <co...@koeninger.org>.
60 partitions in and of itself shouldn't be a big performance issue
(as long as producers are distributing across partitions evenly).

On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
> Thanks again Cody. Regarding the details 66 kafka partitions on 3
> kafka servers, likely 8 core systems with 10 disks each. Maybe the
> issue with the receiver was the large number of partitions. I had
> miscounted the disks and so 11*3*2 is how I decided to partition my
> topic on insertion, ( by my own, unjustified reasoning, on a first
> attempt ) . This worked well enough for me, I put 1.7 billion entries
> into Kafka on a map reduce job in 5 and a half hours.
>
> I was concerned using spark 1.5.2 because I'm currently putting my
> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
> yesterday, I tried building against 1.5.2. So far it's running without
> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
> improvement using the same code, but I'll see how the direct api
> handles it. In the end I can reduce the number of partitions in Kafka
> if it causes big performance issues.
>
> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <co...@koeninger.org> wrote:
>> print() isn't really the best way to benchmark things, since it calls
>> take(10) under the covers, but 380 records / second for a single
>> receiver doesn't sound right in any case.
>>
>> Am I understanding correctly that you're trying to process a large
>> number of already-existing kafka messages, not keep up with an
>> incoming stream?  Can you give any details (e.g. hardware, number of
>> topicpartitions, etc)?
>>
>> Really though, I'd try to start with spark 1.6 and direct streams, or
>> even just kafkacat, as a baseline.
>>
>>
>>
>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>> Hello again. I searched for "backport kafka" in the list archives but
>>> couldn't find anything but a post from Spark 0.7.2 . I was going to
>>> use accumulators to make a counter, but then saw on the Streaming tab
>>> the Receiver Statistics. Then I removed all other "functionality"
>>> except:
>>>
>>>
>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream = KafkaUtils
>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>> Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams,
>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>       .createStream(jssc, byte[].class, byte[].class,
>>> kafka.serializer.DefaultDecoder.class,
>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>
>>>        dstream.print();
>>>
>>> Then in the Recieiver Stats for the single receiver, I'm seeing around
>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>> above, I'd need to run around 21 receivers, assuming 380 records /
>>> second, just using the print output. This seems awfully high to me,
>>> considering that I wrote 80000+ records a second to Kafka from a
>>> mapreduce job, and that my bottleneck was likely Hbase. Again using
>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>> amount of reads.
>>>
>>> Even given the issues with the 1.2 receivers, is this the expected way
>>> to use the Kafka streaming API, or am I doing something terribly
>>> wrong?
>>>
>>> My application looks like
>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>
>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>> Have you tested for read throughput (without writing to hbase, just
>>>> deserialize)?
>>>>
>>>> Are you limited to using spark 1.2, or is upgrading possible?  The
>>>> kafka direct stream is available starting with 1.3.  If you're stuck
>>>> on 1.2, I believe there have been some attempts to backport it, search
>>>> the mailing list archives.
>>>>
>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>>> I've written an application to get content from a kafka topic with 1.7
>>>>> billion entries,  get the protobuf serialized entries, and insert into
>>>>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>>>>
>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>>>>> 0-2500 writes / second. This will take much too long to consume the
>>>>> entries.
>>>>>
>>>>> I currently believe that the spark kafka receiver is the bottleneck.
>>>>> I've tried both 1.2 receivers, with the WAL and without, and didn't
>>>>> notice any large performance difference. I've tried many different
>>>>> spark configuration options, but can't seem to get better performance.
>>>>>
>>>>> I saw 80000 requests / second inserting these records into kafka using
>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>
>>>>> While hbase inserts might not deliver the same throughput, I'd like to
>>>>> at least get 10%.
>>>>>
>>>>> My application looks like
>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>
>>>>> This is my first spark application. I'd appreciate any assistance.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
Thanks again Cody. Regarding the details 66 kafka partitions on 3
kafka servers, likely 8 core systems with 10 disks each. Maybe the
issue with the receiver was the large number of partitions. I had
miscounted the disks and so 11*3*2 is how I decided to partition my
topic on insertion, ( by my own, unjustified reasoning, on a first
attempt ) . This worked well enough for me, I put 1.7 billion entries
into Kafka on a map reduce job in 5 and a half hours.

I was concerned using spark 1.5.2 because I'm currently putting my
data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
built for spark 1.2 on CDH 5.3. But after debugging quite a bit
yesterday, I tried building against 1.5.2. So far it's running without
issue on a Spark 1.5.2 cluster. I'm not sure there was too much
improvement using the same code, but I'll see how the direct api
handles it. In the end I can reduce the number of partitions in Kafka
if it causes big performance issues.

On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <co...@koeninger.org> wrote:
> print() isn't really the best way to benchmark things, since it calls
> take(10) under the covers, but 380 records / second for a single
> receiver doesn't sound right in any case.
>
> Am I understanding correctly that you're trying to process a large
> number of already-existing kafka messages, not keep up with an
> incoming stream?  Can you give any details (e.g. hardware, number of
> topicpartitions, etc)?
>
> Really though, I'd try to start with spark 1.6 and direct streams, or
> even just kafkacat, as a baseline.
>
>
>
> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>> Hello again. I searched for "backport kafka" in the list archives but
>> couldn't find anything but a post from Spark 0.7.2 . I was going to
>> use accumulators to make a counter, but then saw on the Streaming tab
>> the Receiver Statistics. Then I removed all other "functionality"
>> except:
>>
>>
>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream = KafkaUtils
>>       //createStream(JavaStreamingContext jssc,Class<K>
>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>> Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams,
>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>       .createStream(jssc, byte[].class, byte[].class,
>> kafka.serializer.DefaultDecoder.class,
>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>> StorageLevel.MEMORY_AND_DISK_SER());
>>
>>        dstream.print();
>>
>> Then in the Recieiver Stats for the single receiver, I'm seeing around
>> 380 records / second. Then to get anywhere near my 10% mentioned
>> above, I'd need to run around 21 receivers, assuming 380 records /
>> second, just using the print output. This seems awfully high to me,
>> considering that I wrote 80000+ records a second to Kafka from a
>> mapreduce job, and that my bottleneck was likely Hbase. Again using
>> the 380 estimate, I would need 200+ receivers to reach a similar
>> amount of reads.
>>
>> Even given the issues with the 1.2 receivers, is this the expected way
>> to use the Kafka streaming API, or am I doing something terribly
>> wrong?
>>
>> My application looks like
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>> Have you tested for read throughput (without writing to hbase, just
>>> deserialize)?
>>>
>>> Are you limited to using spark 1.2, or is upgrading possible?  The
>>> kafka direct stream is available starting with 1.3.  If you're stuck
>>> on 1.2, I believe there have been some attempts to backport it, search
>>> the mailing list archives.
>>>
>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>>> I've written an application to get content from a kafka topic with 1.7
>>>> billion entries,  get the protobuf serialized entries, and insert into
>>>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>>>
>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>>>> 0-2500 writes / second. This will take much too long to consume the
>>>> entries.
>>>>
>>>> I currently believe that the spark kafka receiver is the bottleneck.
>>>> I've tried both 1.2 receivers, with the WAL and without, and didn't
>>>> notice any large performance difference. I've tried many different
>>>> spark configuration options, but can't seem to get better performance.
>>>>
>>>> I saw 80000 requests / second inserting these records into kafka using
>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>
>>>> While hbase inserts might not deliver the same throughput, I'd like to
>>>> at least get 10%.
>>>>
>>>> My application looks like
>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>
>>>> This is my first spark application. I'd appreciate any assistance.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Cody Koeninger <co...@koeninger.org>.
print() isn't really the best way to benchmark things, since it calls
take(10) under the covers, but 380 records / second for a single
receiver doesn't sound right in any case.

Am I understanding correctly that you're trying to process a large
number of already-existing kafka messages, not keep up with an
incoming stream?  Can you give any details (e.g. hardware, number of
topicpartitions, etc)?

Really though, I'd try to start with spark 1.6 and direct streams, or
even just kafkacat, as a baseline.



On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
> Hello again. I searched for "backport kafka" in the list archives but
> couldn't find anything but a post from Spark 0.7.2 . I was going to
> use accumulators to make a counter, but then saw on the Streaming tab
> the Receiver Statistics. Then I removed all other "functionality"
> except:
>
>
>     JavaPairReceiverInputDStream<byte[], byte[]> dstream = KafkaUtils
>       //createStream(JavaStreamingContext jssc,Class<K>
> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
> Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams,
> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>       .createStream(jssc, byte[].class, byte[].class,
> kafka.serializer.DefaultDecoder.class,
> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
> StorageLevel.MEMORY_AND_DISK_SER());
>
>        dstream.print();
>
> Then in the Recieiver Stats for the single receiver, I'm seeing around
> 380 records / second. Then to get anywhere near my 10% mentioned
> above, I'd need to run around 21 receivers, assuming 380 records /
> second, just using the print output. This seems awfully high to me,
> considering that I wrote 80000+ records a second to Kafka from a
> mapreduce job, and that my bottleneck was likely Hbase. Again using
> the 380 estimate, I would need 200+ receivers to reach a similar
> amount of reads.
>
> Even given the issues with the 1.2 receivers, is this the expected way
> to use the Kafka streaming API, or am I doing something terribly
> wrong?
>
> My application looks like
> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>
> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org> wrote:
>> Have you tested for read throughput (without writing to hbase, just
>> deserialize)?
>>
>> Are you limited to using spark 1.2, or is upgrading possible?  The
>> kafka direct stream is available starting with 1.3.  If you're stuck
>> on 1.2, I believe there have been some attempts to backport it, search
>> the mailing list archives.
>>
>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>>> I've written an application to get content from a kafka topic with 1.7
>>> billion entries,  get the protobuf serialized entries, and insert into
>>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>>
>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>>> 0-2500 writes / second. This will take much too long to consume the
>>> entries.
>>>
>>> I currently believe that the spark kafka receiver is the bottleneck.
>>> I've tried both 1.2 receivers, with the WAL and without, and didn't
>>> notice any large performance difference. I've tried many different
>>> spark configuration options, but can't seem to get better performance.
>>>
>>> I saw 80000 requests / second inserting these records into kafka using
>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>
>>> While hbase inserts might not deliver the same throughput, I'd like to
>>> at least get 10%.
>>>
>>> My application looks like
>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>
>>> This is my first spark application. I'd appreciate any assistance.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
Hello again. I searched for "backport kafka" in the list archives but
couldn't find anything but a post from Spark 0.7.2 . I was going to
use accumulators to make a counter, but then saw on the Streaming tab
the Receiver Statistics. Then I removed all other "functionality"
except:


    JavaPairReceiverInputDStream<byte[], byte[]> dstream = KafkaUtils
      //createStream(JavaStreamingContext jssc,Class<K>
keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams,
java.util.Map<String,Integer> topics, StorageLevel storageLevel)
      .createStream(jssc, byte[].class, byte[].class,
kafka.serializer.DefaultDecoder.class,
kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
StorageLevel.MEMORY_AND_DISK_SER());

       dstream.print();

Then in the Recieiver Stats for the single receiver, I'm seeing around
380 records / second. Then to get anywhere near my 10% mentioned
above, I'd need to run around 21 receivers, assuming 380 records /
second, just using the print output. This seems awfully high to me,
considering that I wrote 80000+ records a second to Kafka from a
mapreduce job, and that my bottleneck was likely Hbase. Again using
the 380 estimate, I would need 200+ receivers to reach a similar
amount of reads.

Even given the issues with the 1.2 receivers, is this the expected way
to use the Kafka streaming API, or am I doing something terribly
wrong?

My application looks like
https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877

On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org> wrote:
> Have you tested for read throughput (without writing to hbase, just
> deserialize)?
>
> Are you limited to using spark 1.2, or is upgrading possible?  The
> kafka direct stream is available starting with 1.3.  If you're stuck
> on 1.2, I believe there have been some attempts to backport it, search
> the mailing list archives.
>
> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>> I've written an application to get content from a kafka topic with 1.7
>> billion entries,  get the protobuf serialized entries, and insert into
>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>
>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>> 0-2500 writes / second. This will take much too long to consume the
>> entries.
>>
>> I currently believe that the spark kafka receiver is the bottleneck.
>> I've tried both 1.2 receivers, with the WAL and without, and didn't
>> notice any large performance difference. I've tried many different
>> spark configuration options, but can't seem to get better performance.
>>
>> I saw 80000 requests / second inserting these records into kafka using
>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>
>> While hbase inserts might not deliver the same throughput, I'd like to
>> at least get 10%.
>>
>> My application looks like
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>> This is my first spark application. I'd appreciate any assistance.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
Hi Cody,

  I'm going to use an accumulator right now to get an idea of the
throughput. Thanks for mentioning the back ported module. Also it
looks like I missed this section:
https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch
from the docs. Then maybe I should try creating multiple streams to
get more throughput?

Thanks,

Colin Williams

On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <co...@koeninger.org> wrote:
> Have you tested for read throughput (without writing to hbase, just
> deserialize)?
>
> Are you limited to using spark 1.2, or is upgrading possible?  The
> kafka direct stream is available starting with 1.3.  If you're stuck
> on 1.2, I believe there have been some attempts to backport it, search
> the mailing list archives.
>
> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
>> I've written an application to get content from a kafka topic with 1.7
>> billion entries,  get the protobuf serialized entries, and insert into
>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>
>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>> 0-2500 writes / second. This will take much too long to consume the
>> entries.
>>
>> I currently believe that the spark kafka receiver is the bottleneck.
>> I've tried both 1.2 receivers, with the WAL and without, and didn't
>> notice any large performance difference. I've tried many different
>> spark configuration options, but can't seem to get better performance.
>>
>> I saw 80000 requests / second inserting these records into kafka using
>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>
>> While hbase inserts might not deliver the same throughput, I'd like to
>> at least get 10%.
>>
>> My application looks like
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>> This is my first spark application. I'd appreciate any assistance.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Cody Koeninger <co...@koeninger.org>.
Have you tested for read throughput (without writing to hbase, just
deserialize)?

Are you limited to using spark 1.2, or is upgrading possible?  The
kafka direct stream is available starting with 1.3.  If you're stuck
on 1.2, I believe there have been some attempts to backport it, search
the mailing list archives.

On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <di...@uw.edu> wrote:
> I've written an application to get content from a kafka topic with 1.7
> billion entries,  get the protobuf serialized entries, and insert into
> hbase. Currently the environment that I'm running in is Spark 1.2.
>
> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
> 0-2500 writes / second. This will take much too long to consume the
> entries.
>
> I currently believe that the spark kafka receiver is the bottleneck.
> I've tried both 1.2 receivers, with the WAL and without, and didn't
> notice any large performance difference. I've tried many different
> spark configuration options, but can't seem to get better performance.
>
> I saw 80000 requests / second inserting these records into kafka using
> yarn / hbase / protobuf / kafka in a bulk fashion.
>
> While hbase inserts might not deliver the same throughput, I'd like to
> at least get 10%.
>
> My application looks like
> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>
> This is my first spark application. I'd appreciate any assistance.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Improving performance of a kafka spark streaming app

Posted by Colin Kincaid Williams <di...@uw.edu>.
Hi David,

 My current concern is that I'm using a spark hbase bulk put driver
written for Spark 1.2 on the version of CDH my spark / yarn cluster is
running on. Even if I were to run on another Spark cluster, I'm
concerned that I might have issues making the put requests into hbase.
However I should give it a shot if I abandon Spark 1.2, and my current
environment.

Thanks,

Colin Williams

On Mon, May 2, 2016 at 6:06 PM, Krieg, David
<Da...@earlywarning.com> wrote:
> Spark 1.2 is a little old and busted. I think most of the advice you'll get is
> to try to use Spark 1.3 at least, which introduced a new Spark streaming mode
> (direct receiver). The 1.2 Receiver based implementation had a number of
> shortcomings. 1.3 is where the "direct streaming" interface was introduced,
> which is what we use. You'll get more joy the more you upgrade Spark, at least
> to some extent.
>
> David Krieg | Enterprise Software Engineer
> Early Warning
> Direct: 480.426.2171 | Fax: 480.483.4628 | Mobile: 859.227.6173
>
>
> -----Original Message-----
> From: Colin Kincaid Williams [mailto:discord@uw.edu]
> Sent: Monday, May 02, 2016 10:55 AM
> To: user@spark.apache.org
> Subject: Improving performance of a kafka spark streaming app
>
> I've written an application to get content from a kafka topic with 1.7 billion
> entries,  get the protobuf serialized entries, and insert into hbase.
> Currently the environment that I'm running in is Spark 1.2.
>
> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
> 0-2500 writes / second. This will take much too long to consume the entries.
>
> I currently believe that the spark kafka receiver is the bottleneck.
> I've tried both 1.2 receivers, with the WAL and without, and didn't notice any
> large performance difference. I've tried many different spark configuration
> options, but can't seem to get better performance.
>
> I saw 80000 requests / second inserting these records into kafka using yarn /
> hbase / protobuf / kafka in a bulk fashion.
>
> While hbase inserts might not deliver the same throughput, I'd like to at
> least get 10%.
>
> My application looks like
> https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_drocsid_b0efa4ff6ff4a7c3c8bb56767d0b6877&d=CwIBaQ&c=rtKJL1IoQkrgf7t9D493SuUmYZJqgJmwEhoO6UD_DpY&r=rWkTz7PE5TRtkkWejPue_zcBxoTQE4f0g8LBaR2mVi8&m=pVPZ7WXHDTWO7s5u0qQupsWkiaGiv3B50BdtYvOvazo&s=_FnCXUJfmNKIVqDy046SS5YVP8cpJBQ3ynECFWJkzK8&e=
>
> This is my first spark application. I'd appreciate any assistance.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional
> commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org