You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shushant Arora <sh...@gmail.com> on 2015/07/20 13:22:13 UTC

spark streaming 1.3 issues

Hi

1.I am using spark streaming 1.3 for reading from a kafka queue and pushing
events to external source.

I passed in my job 20 executors but it is showing only 6 in executor tab ?
When I used highlevel streaming 1.2 - its showing 20 executors. My cluster
is 10 node yarn cluster with each node has 8 cores.

I am calling the script as :

spark-submit --class classname --num-executors 10 --executor-cores 2
--master yarn-client jarfile

2. On Streaming UI

Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
Time since start: 13 minutes 28 seconds
Network receivers: 0
Batch interval: 1 second
Processed batches: 807
Waiting batches: 0
Received records: 0
Processed records: 0

Received records and processed records are always 0 . And Speed of
processing is slow compare to highlevel api.

I am procesing the stream using mapPartition.

When I used
directKafkaStream.foreachRDD(new Function<JavaPairRDD<byte[],byte[]>,
Void>() {
 @Override
public Void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception {
// TODO Auto-generated method stub
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
}
}

It throws an exception
java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot
be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

Thanks
Shushant

Re: spark streaming 1.3 issues

Posted by Shushant Arora <sh...@gmail.com>.
In spark streaming 1.3 -

Say I have 10 executors each with 4 cores so in total 40 tasks in parllel
at once. If I repartition kafka directstream to 40 partitions vs say I have
in kafka topic 300 partitions - which one will be more efficient , Should I
repartition the kafka stream equal to num of cores or keep it same as 300?

 If I have number of partitions greater than parllel tasks will that not
cause overhead of task scheduling ?

On Wed, Jul 22, 2015 at 11:37 AM, Tathagata Das <td...@databricks.com> wrote:

> For Java, do
>
> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*).
> offsetRanges();
>
> If you fix that error, you should be seeing data.
>
> You can call arbitrary RDD operations on a DStream, using
> DStream.transform. Take a look at the docs.
>
> For the direct kafka approach you are doing,
> - tasks do get launched for empty partitions
> - driver may make multiple calls to Kafka brokers to get all the offset
> information. But that does not mean you should reduce partitions. the whole
> point of having large number of partition is the consume the data in
> parallel. If you reduce the number of partitions, that defeats the purpose
> of having partitoins at all. And the driver making calls for getting
> metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually.
> Rather receiving and processing the actual data is usually the bottleneck
> and to increase throughput you should have larger number of partitions.
>
>
>
> On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> I'd suggest you upgrading to 1.4 as it has better metrices and UI.
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Is coalesce not applicable to kafkaStream ? How to do coalesce on
>>> kafkadirectstream its not there in api ?
>>> Shall calling repartition on directstream with number of executors as
>>> numpartitions will imrove perfromance ?
>>>
>>> Does in 1.3 tasks get launched for partitions which are empty? Does
>>> driver makes call for getting offsets of each partition separately or in
>>> single call it gets all partitions new offsets ? I mean will reducing no of
>>>  partitions oin kafka help improving the performance?
>>>
>>> On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> 1.I am using spark streaming 1.3 for reading from a kafka queue and
>>>> pushing events to external source.
>>>>
>>>> I passed in my job 20 executors but it is showing only 6 in executor
>>>> tab ?
>>>> When I used highlevel streaming 1.2 - its showing 20 executors. My
>>>> cluster is 10 node yarn cluster with each node has 8 cores.
>>>>
>>>> I am calling the script as :
>>>>
>>>> spark-submit --class classname --num-executors 10 --executor-cores 2
>>>> --master yarn-client jarfile
>>>>
>>>> 2. On Streaming UI
>>>>
>>>> Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
>>>> Time since start: 13 minutes 28 seconds
>>>> Network receivers: 0
>>>> Batch interval: 1 second
>>>> Processed batches: 807
>>>> Waiting batches: 0
>>>> Received records: 0
>>>> Processed records: 0
>>>>
>>>> Received records and processed records are always 0 . And Speed of
>>>> processing is slow compare to highlevel api.
>>>>
>>>> I am procesing the stream using mapPartition.
>>>>
>>>> When I used
>>>> directKafkaStream.foreachRDD(new Function<JavaPairRDD<byte[],byte[]>,
>>>> Void>() {
>>>>  @Override
>>>> public Void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception {
>>>> // TODO Auto-generated method stub
>>>> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
>>>> }
>>>> }
>>>>
>>>> It throws an exception
>>>> java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD
>>>> cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>>>>
>>>> Thanks
>>>> Shushant
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: spark streaming 1.3 issues

Posted by Tathagata Das <td...@databricks.com>.
For Java, do

OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*).offsetRanges();

If you fix that error, you should be seeing data.

You can call arbitrary RDD operations on a DStream, using
DStream.transform. Take a look at the docs.

For the direct kafka approach you are doing,
- tasks do get launched for empty partitions
- driver may make multiple calls to Kafka brokers to get all the offset
information. But that does not mean you should reduce partitions. the whole
point of having large number of partition is the consume the data in
parallel. If you reduce the number of partitions, that defeats the purpose
of having partitoins at all. And the driver making calls for getting
metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually.
Rather receiving and processing the actual data is usually the bottleneck
and to increase throughput you should have larger number of partitions.



On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> I'd suggest you upgrading to 1.4 as it has better metrices and UI.
>
> Thanks
> Best Regards
>
> On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Is coalesce not applicable to kafkaStream ? How to do coalesce on
>> kafkadirectstream its not there in api ?
>> Shall calling repartition on directstream with number of executors as
>> numpartitions will imrove perfromance ?
>>
>> Does in 1.3 tasks get launched for partitions which are empty? Does
>> driver makes call for getting offsets of each partition separately or in
>> single call it gets all partitions new offsets ? I mean will reducing no of
>>  partitions oin kafka help improving the performance?
>>
>> On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Hi
>>>
>>> 1.I am using spark streaming 1.3 for reading from a kafka queue and
>>> pushing events to external source.
>>>
>>> I passed in my job 20 executors but it is showing only 6 in executor tab
>>> ?
>>> When I used highlevel streaming 1.2 - its showing 20 executors. My
>>> cluster is 10 node yarn cluster with each node has 8 cores.
>>>
>>> I am calling the script as :
>>>
>>> spark-submit --class classname --num-executors 10 --executor-cores 2
>>> --master yarn-client jarfile
>>>
>>> 2. On Streaming UI
>>>
>>> Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
>>> Time since start: 13 minutes 28 seconds
>>> Network receivers: 0
>>> Batch interval: 1 second
>>> Processed batches: 807
>>> Waiting batches: 0
>>> Received records: 0
>>> Processed records: 0
>>>
>>> Received records and processed records are always 0 . And Speed of
>>> processing is slow compare to highlevel api.
>>>
>>> I am procesing the stream using mapPartition.
>>>
>>> When I used
>>> directKafkaStream.foreachRDD(new Function<JavaPairRDD<byte[],byte[]>,
>>> Void>() {
>>>  @Override
>>> public Void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception {
>>> // TODO Auto-generated method stub
>>> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
>>> }
>>> }
>>>
>>> It throws an exception
>>> java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD
>>> cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>>>
>>> Thanks
>>> Shushant
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: spark streaming 1.3 issues

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
I'd suggest you upgrading to 1.4 as it has better metrices and UI.

Thanks
Best Regards

On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora <sh...@gmail.com>
wrote:

> Is coalesce not applicable to kafkaStream ? How to do coalesce on
> kafkadirectstream its not there in api ?
> Shall calling repartition on directstream with number of executors as
> numpartitions will imrove perfromance ?
>
> Does in 1.3 tasks get launched for partitions which are empty? Does driver
> makes call for getting offsets of each partition separately or in single
> call it gets all partitions new offsets ? I mean will reducing no of
>  partitions oin kafka help improving the performance?
>
> On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Hi
>>
>> 1.I am using spark streaming 1.3 for reading from a kafka queue and
>> pushing events to external source.
>>
>> I passed in my job 20 executors but it is showing only 6 in executor tab ?
>> When I used highlevel streaming 1.2 - its showing 20 executors. My
>> cluster is 10 node yarn cluster with each node has 8 cores.
>>
>> I am calling the script as :
>>
>> spark-submit --class classname --num-executors 10 --executor-cores 2
>> --master yarn-client jarfile
>>
>> 2. On Streaming UI
>>
>> Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
>> Time since start: 13 minutes 28 seconds
>> Network receivers: 0
>> Batch interval: 1 second
>> Processed batches: 807
>> Waiting batches: 0
>> Received records: 0
>> Processed records: 0
>>
>> Received records and processed records are always 0 . And Speed of
>> processing is slow compare to highlevel api.
>>
>> I am procesing the stream using mapPartition.
>>
>> When I used
>> directKafkaStream.foreachRDD(new Function<JavaPairRDD<byte[],byte[]>,
>> Void>() {
>>  @Override
>> public Void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception {
>> // TODO Auto-generated method stub
>> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
>> }
>> }
>>
>> It throws an exception
>> java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD
>> cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>>
>> Thanks
>> Shushant
>>
>>
>>
>>
>>
>>
>>
>

Re: spark streaming 1.3 issues

Posted by Shushant Arora <sh...@gmail.com>.
Is coalesce not applicable to kafkaStream ? How to do coalesce on
kafkadirectstream its not there in api ?
Shall calling repartition on directstream with number of executors as
numpartitions will imrove perfromance ?

Does in 1.3 tasks get launched for partitions which are empty? Does driver
makes call for getting offsets of each partition separately or in single
call it gets all partitions new offsets ? I mean will reducing no of
 partitions oin kafka help improving the performance?

On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora <sh...@gmail.com>
wrote:

> Hi
>
> 1.I am using spark streaming 1.3 for reading from a kafka queue and
> pushing events to external source.
>
> I passed in my job 20 executors but it is showing only 6 in executor tab ?
> When I used highlevel streaming 1.2 - its showing 20 executors. My cluster
> is 10 node yarn cluster with each node has 8 cores.
>
> I am calling the script as :
>
> spark-submit --class classname --num-executors 10 --executor-cores 2
> --master yarn-client jarfile
>
> 2. On Streaming UI
>
> Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
> Time since start: 13 minutes 28 seconds
> Network receivers: 0
> Batch interval: 1 second
> Processed batches: 807
> Waiting batches: 0
> Received records: 0
> Processed records: 0
>
> Received records and processed records are always 0 . And Speed of
> processing is slow compare to highlevel api.
>
> I am procesing the stream using mapPartition.
>
> When I used
> directKafkaStream.foreachRDD(new Function<JavaPairRDD<byte[],byte[]>,
> Void>() {
>  @Override
> public Void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception {
> // TODO Auto-generated method stub
> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
> }
> }
>
> It throws an exception
> java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot
> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>
> Thanks
> Shushant
>
>
>
>
>
>
>