You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aniket Bhatnagar <an...@gmail.com> on 2014/09/11 13:51:31 UTC

Spark streaming stops computing while the receiver keeps running without any errors reported

Hi all

I am trying to run kinesis spark streaming application on a standalone
spark cluster. The job works find in local mode but when I submit it (using
spark-submit), it doesn't do anything. I enabled logs
for org.apache.spark.streaming.kinesis package and I regularly get the
following in worker logs:

14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
x.x.x.x:b88a9210-cbb9-4c31-8da7-35fd92faba09 stored 34 records for shardId
shardId-000000000000
14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
x.x.x.x:b2e9c20f-470a-44fe-a036-630c776919fb stored 31 records for shardId
shardId-000000000001

But the job does not perform any operations defined on DStream. To
investigate this further, I changed the kinesis-asl's KinesisUtils to
perform the following computation on the DStream created
using ssc.receiverStream(new KinesisReceiver...):

stream.count().foreachRDD(rdd => rdd.foreach(tuple => logInfo("Emitted " +
tuple)))

Even the above line does not results in any corresponding log entries both
in driver and worker logs. The only relevant logs that I could find in
driver logs are:
14/09/11 11:40:58 INFO DAGScheduler: Stage 2 (foreach at
KinesisUtils.scala:68) finished in 0.398 s
14/09/11 11:40:58 INFO SparkContext: Job finished: foreach at
KinesisUtils.scala:68, took 4.926449985 s
14/09/11 11:40:58 INFO JobScheduler: Finished job streaming job
1410435653000 ms.0 from job set of time 1410435653000 ms
14/09/11 11:40:58 INFO JobScheduler: Starting job streaming job
1410435653000 ms.1 from job set of time 1410435653000 ms
14/09/11 11:40:58 INFO SparkContext: Starting job: foreach at
KinesisUtils.scala:68
14/09/11 11:40:58 INFO DAGScheduler: Registering RDD 13 (union at
DStream.scala:489)
14/09/11 11:40:58 INFO DAGScheduler: Got job 3 (foreach at
KinesisUtils.scala:68) with 2 output partitions (allowLocal=false)
14/09/11 11:40:58 INFO DAGScheduler: Final stage: Stage 5(foreach at
KinesisUtils.scala:68)

After the above logs, nothing shows up corresponding to KinesisUtils. I am
out of ideas on this one and any help on this would greatly appreciated.

Thanks,
Aniket

Re: Spark streaming stops computing while the receiver keeps running without any errors reported

Posted by Aniket Bhatnagar <an...@gmail.com>.
Hi all

I was finally able to get this working by setting
the SPARK_EXECUTOR_INSTANCES to a high number. However, I am wondering if
this is a bug because the app gets submitted but ceases to run because it
can't run desired number of workers. Shouldn't the app be rejected if it
cant be run on the cluster?

Thanks,
Aniket

On 22 September 2014 18:14, Aniket Bhatnagar <an...@gmail.com>
wrote:

> Hi all
>
> I was finally able to figure out why this streaming appeared stuck. The
> reason was that I was running out of workers in my standalone deployment of
> Spark. There was no feedback in the logs which is why it took a little time
> for me to figure out.
>
> However, now I am trying to run the same in yarn-client mode and this is
> again giving the same problem. Is it possible to run out of workers in YARN
> mode as well? If so, how can I figure that out?
>
> Thanks,
> Aniket
>
> On 19 September 2014 18:07, Aniket Bhatnagar <an...@gmail.com>
> wrote:
>
>> Apologies in delay in getting back on this. It seems the Kinesis example
>> does not run on Spark 1.1.0 even when it is built using kinesis-acl profile
>> because of a dependency conflict in http client (same issue as
>> http://mail-archives.apache.org/mod_mbox/spark-dev/201409.mbox/%3CCAJOb8btdXks-7-spJJ5jMNw0XsnrjwDpCQqtjht1hUn6j4zb_g@mail.gmail.com%3E).
>> I had to add a later version of http client in kinesis-acl profile to make
>> it run. Then, the Kinesis example sets master as local so it does not
>> honour the MASTER environment variable as other examples do. Once I was
>> able to resolve these issues, I was finally able to reproduce the issue.
>> The example works fine in local mode but does not do anything when receiver
>> runs in remote workers.
>>
>> Spark streaming does not report any blocks received from the receivers
>> even though I can see the following lines in the app logs (I modified the
>> debug line to print size as well):
>>
>> 14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Pushed block
>> input-0-1411129664668 in 15 ms
>> 14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Reported block
>> input-0-1411129664668 of size 1
>>
>> Here are the screenshots of Spark admin: http://imgur.com/a/gWKYm
>>
>> I also ran other examples (custom receiver, etc) in both local and
>> distributed mode and they seem to be working fine.
>>
>> Any ideas?
>>
>> Thanks,
>> Aniket
>>
>> On 12 September 2014 02:49, Tathagata Das <ta...@gmail.com>
>> wrote:
>>
>>> This is very puzzling, given that this works in the local mode.
>>>
>>> Does running the kinesis example work with your spark-submit?
>>>
>>>
>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>>>
>>> The instructions are present in the streaming guide.
>>> https://github.com/apache/spark/blob/master/docs/streaming-kinesis-integration.md
>>>
>>> If that does not work on cluster, then I would see the streaming UI for
>>> the number records that are being received, and the stages page for whether
>>> jobs are being executed for every batch or not. Can tell use whether that
>>> is working well.
>>>
>>> Also ccing, chris fregly who wrote Kinesis integration.
>>>
>>> TD
>>>
>>>
>>>
>>>
>>> On Thu, Sep 11, 2014 at 4:51 AM, Aniket Bhatnagar <
>>> aniket.bhatnagar@gmail.com> wrote:
>>>
>>>> Hi all
>>>>
>>>> I am trying to run kinesis spark streaming application on a standalone
>>>> spark cluster. The job works find in local mode but when I submit it (using
>>>> spark-submit), it doesn't do anything. I enabled logs
>>>> for org.apache.spark.streaming.kinesis package and I regularly get the
>>>> following in worker logs:
>>>>
>>>> 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
>>>> x.x.x.x:b88a9210-cbb9-4c31-8da7-35fd92faba09 stored 34 records for shardId
>>>> shardId-000000000000
>>>> 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
>>>> x.x.x.x:b2e9c20f-470a-44fe-a036-630c776919fb stored 31 records for shardId
>>>> shardId-000000000001
>>>>
>>>> But the job does not perform any operations defined on DStream. To
>>>> investigate this further, I changed the kinesis-asl's KinesisUtils to
>>>> perform the following computation on the DStream created
>>>> using ssc.receiverStream(new KinesisReceiver...):
>>>>
>>>> stream.count().foreachRDD(rdd => rdd.foreach(tuple => logInfo("Emitted
>>>> " + tuple)))
>>>>
>>>> Even the above line does not results in any corresponding log entries
>>>> both in driver and worker logs. The only relevant logs that I could find in
>>>> driver logs are:
>>>> 14/09/11 11:40:58 INFO DAGScheduler: Stage 2 (foreach at
>>>> KinesisUtils.scala:68) finished in 0.398 s
>>>> 14/09/11 11:40:58 INFO SparkContext: Job finished: foreach at
>>>> KinesisUtils.scala:68, took 4.926449985 s
>>>> 14/09/11 11:40:58 INFO JobScheduler: Finished job streaming job
>>>> 1410435653000 ms.0 from job set of time 1410435653000 ms
>>>> 14/09/11 11:40:58 INFO JobScheduler: Starting job streaming job
>>>> 1410435653000 ms.1 from job set of time 1410435653000 ms
>>>> 14/09/11 11:40:58 INFO SparkContext: Starting job: foreach at
>>>> KinesisUtils.scala:68
>>>> 14/09/11 11:40:58 INFO DAGScheduler: Registering RDD 13 (union at
>>>> DStream.scala:489)
>>>> 14/09/11 11:40:58 INFO DAGScheduler: Got job 3 (foreach at
>>>> KinesisUtils.scala:68) with 2 output partitions (allowLocal=false)
>>>> 14/09/11 11:40:58 INFO DAGScheduler: Final stage: Stage 5(foreach at
>>>> KinesisUtils.scala:68)
>>>>
>>>> After the above logs, nothing shows up corresponding to KinesisUtils. I
>>>> am out of ideas on this one and any help on this would greatly appreciated.
>>>>
>>>> Thanks,
>>>> Aniket
>>>>
>>>
>>>
>>
>

Re: Spark streaming stops computing while the receiver keeps running without any errors reported

Posted by Aniket Bhatnagar <an...@gmail.com>.
Hi all

I was finally able to figure out why this streaming appeared stuck. The
reason was that I was running out of workers in my standalone deployment of
Spark. There was no feedback in the logs which is why it took a little time
for me to figure out.

However, now I am trying to run the same in yarn-client mode and this is
again giving the same problem. Is it possible to run out of workers in YARN
mode as well? If so, how can I figure that out?

Thanks,
Aniket

On 19 September 2014 18:07, Aniket Bhatnagar <an...@gmail.com>
wrote:

> Apologies in delay in getting back on this. It seems the Kinesis example
> does not run on Spark 1.1.0 even when it is built using kinesis-acl profile
> because of a dependency conflict in http client (same issue as
> http://mail-archives.apache.org/mod_mbox/spark-dev/201409.mbox/%3CCAJOb8btdXks-7-spJJ5jMNw0XsnrjwDpCQqtjht1hUn6j4zb_g@mail.gmail.com%3E).
> I had to add a later version of http client in kinesis-acl profile to make
> it run. Then, the Kinesis example sets master as local so it does not
> honour the MASTER environment variable as other examples do. Once I was
> able to resolve these issues, I was finally able to reproduce the issue.
> The example works fine in local mode but does not do anything when receiver
> runs in remote workers.
>
> Spark streaming does not report any blocks received from the receivers
> even though I can see the following lines in the app logs (I modified the
> debug line to print size as well):
>
> 14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Pushed block
> input-0-1411129664668 in 15 ms
> 14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Reported block
> input-0-1411129664668 of size 1
>
> Here are the screenshots of Spark admin: http://imgur.com/a/gWKYm
>
> I also ran other examples (custom receiver, etc) in both local and
> distributed mode and they seem to be working fine.
>
> Any ideas?
>
> Thanks,
> Aniket
>
> On 12 September 2014 02:49, Tathagata Das <ta...@gmail.com>
> wrote:
>
>> This is very puzzling, given that this works in the local mode.
>>
>> Does running the kinesis example work with your spark-submit?
>>
>>
>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>>
>> The instructions are present in the streaming guide.
>> https://github.com/apache/spark/blob/master/docs/streaming-kinesis-integration.md
>>
>> If that does not work on cluster, then I would see the streaming UI for
>> the number records that are being received, and the stages page for whether
>> jobs are being executed for every batch or not. Can tell use whether that
>> is working well.
>>
>> Also ccing, chris fregly who wrote Kinesis integration.
>>
>> TD
>>
>>
>>
>>
>> On Thu, Sep 11, 2014 at 4:51 AM, Aniket Bhatnagar <
>> aniket.bhatnagar@gmail.com> wrote:
>>
>>> Hi all
>>>
>>> I am trying to run kinesis spark streaming application on a standalone
>>> spark cluster. The job works find in local mode but when I submit it (using
>>> spark-submit), it doesn't do anything. I enabled logs
>>> for org.apache.spark.streaming.kinesis package and I regularly get the
>>> following in worker logs:
>>>
>>> 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
>>> x.x.x.x:b88a9210-cbb9-4c31-8da7-35fd92faba09 stored 34 records for shardId
>>> shardId-000000000000
>>> 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
>>> x.x.x.x:b2e9c20f-470a-44fe-a036-630c776919fb stored 31 records for shardId
>>> shardId-000000000001
>>>
>>> But the job does not perform any operations defined on DStream. To
>>> investigate this further, I changed the kinesis-asl's KinesisUtils to
>>> perform the following computation on the DStream created
>>> using ssc.receiverStream(new KinesisReceiver...):
>>>
>>> stream.count().foreachRDD(rdd => rdd.foreach(tuple => logInfo("Emitted "
>>> + tuple)))
>>>
>>> Even the above line does not results in any corresponding log entries
>>> both in driver and worker logs. The only relevant logs that I could find in
>>> driver logs are:
>>> 14/09/11 11:40:58 INFO DAGScheduler: Stage 2 (foreach at
>>> KinesisUtils.scala:68) finished in 0.398 s
>>> 14/09/11 11:40:58 INFO SparkContext: Job finished: foreach at
>>> KinesisUtils.scala:68, took 4.926449985 s
>>> 14/09/11 11:40:58 INFO JobScheduler: Finished job streaming job
>>> 1410435653000 ms.0 from job set of time 1410435653000 ms
>>> 14/09/11 11:40:58 INFO JobScheduler: Starting job streaming job
>>> 1410435653000 ms.1 from job set of time 1410435653000 ms
>>> 14/09/11 11:40:58 INFO SparkContext: Starting job: foreach at
>>> KinesisUtils.scala:68
>>> 14/09/11 11:40:58 INFO DAGScheduler: Registering RDD 13 (union at
>>> DStream.scala:489)
>>> 14/09/11 11:40:58 INFO DAGScheduler: Got job 3 (foreach at
>>> KinesisUtils.scala:68) with 2 output partitions (allowLocal=false)
>>> 14/09/11 11:40:58 INFO DAGScheduler: Final stage: Stage 5(foreach at
>>> KinesisUtils.scala:68)
>>>
>>> After the above logs, nothing shows up corresponding to KinesisUtils. I
>>> am out of ideas on this one and any help on this would greatly appreciated.
>>>
>>> Thanks,
>>> Aniket
>>>
>>
>>
>

Re: Spark streaming stops computing while the receiver keeps running without any errors reported

Posted by Aniket Bhatnagar <an...@gmail.com>.
Apologies in delay in getting back on this. It seems the Kinesis example
does not run on Spark 1.1.0 even when it is built using kinesis-acl profile
because of a dependency conflict in http client (same issue as
http://mail-archives.apache.org/mod_mbox/spark-dev/201409.mbox/%3CCAJOb8btdXks-7-spJJ5jMNw0XsnrjwDpCQqtjht1hUn6j4zb_g@mail.gmail.com%3E).
I had to add a later version of http client in kinesis-acl profile to make
it run. Then, the Kinesis example sets master as local so it does not
honour the MASTER environment variable as other examples do. Once I was
able to resolve these issues, I was finally able to reproduce the issue.
The example works fine in local mode but does not do anything when receiver
runs in remote workers.

Spark streaming does not report any blocks received from the receivers even
though I can see the following lines in the app logs (I modified the debug
line to print size as well):

14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Pushed block
input-0-1411129664668 in 15 ms
14/09/19 12:30:18 DEBUG ReceiverSupervisorImpl: Reported block
input-0-1411129664668 of size 1

Here are the screenshots of Spark admin: http://imgur.com/a/gWKYm

I also ran other examples (custom receiver, etc) in both local and
distributed mode and they seem to be working fine.

Any ideas?

Thanks,
Aniket

On 12 September 2014 02:49, Tathagata Das <ta...@gmail.com>
wrote:

> This is very puzzling, given that this works in the local mode.
>
> Does running the kinesis example work with your spark-submit?
>
>
> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>
> The instructions are present in the streaming guide.
> https://github.com/apache/spark/blob/master/docs/streaming-kinesis-integration.md
>
> If that does not work on cluster, then I would see the streaming UI for
> the number records that are being received, and the stages page for whether
> jobs are being executed for every batch or not. Can tell use whether that
> is working well.
>
> Also ccing, chris fregly who wrote Kinesis integration.
>
> TD
>
>
>
>
> On Thu, Sep 11, 2014 at 4:51 AM, Aniket Bhatnagar <
> aniket.bhatnagar@gmail.com> wrote:
>
>> Hi all
>>
>> I am trying to run kinesis spark streaming application on a standalone
>> spark cluster. The job works find in local mode but when I submit it (using
>> spark-submit), it doesn't do anything. I enabled logs
>> for org.apache.spark.streaming.kinesis package and I regularly get the
>> following in worker logs:
>>
>> 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
>> x.x.x.x:b88a9210-cbb9-4c31-8da7-35fd92faba09 stored 34 records for shardId
>> shardId-000000000000
>> 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
>> x.x.x.x:b2e9c20f-470a-44fe-a036-630c776919fb stored 31 records for shardId
>> shardId-000000000001
>>
>> But the job does not perform any operations defined on DStream. To
>> investigate this further, I changed the kinesis-asl's KinesisUtils to
>> perform the following computation on the DStream created
>> using ssc.receiverStream(new KinesisReceiver...):
>>
>> stream.count().foreachRDD(rdd => rdd.foreach(tuple => logInfo("Emitted "
>> + tuple)))
>>
>> Even the above line does not results in any corresponding log entries
>> both in driver and worker logs. The only relevant logs that I could find in
>> driver logs are:
>> 14/09/11 11:40:58 INFO DAGScheduler: Stage 2 (foreach at
>> KinesisUtils.scala:68) finished in 0.398 s
>> 14/09/11 11:40:58 INFO SparkContext: Job finished: foreach at
>> KinesisUtils.scala:68, took 4.926449985 s
>> 14/09/11 11:40:58 INFO JobScheduler: Finished job streaming job
>> 1410435653000 ms.0 from job set of time 1410435653000 ms
>> 14/09/11 11:40:58 INFO JobScheduler: Starting job streaming job
>> 1410435653000 ms.1 from job set of time 1410435653000 ms
>> 14/09/11 11:40:58 INFO SparkContext: Starting job: foreach at
>> KinesisUtils.scala:68
>> 14/09/11 11:40:58 INFO DAGScheduler: Registering RDD 13 (union at
>> DStream.scala:489)
>> 14/09/11 11:40:58 INFO DAGScheduler: Got job 3 (foreach at
>> KinesisUtils.scala:68) with 2 output partitions (allowLocal=false)
>> 14/09/11 11:40:58 INFO DAGScheduler: Final stage: Stage 5(foreach at
>> KinesisUtils.scala:68)
>>
>> After the above logs, nothing shows up corresponding to KinesisUtils. I
>> am out of ideas on this one and any help on this would greatly appreciated.
>>
>> Thanks,
>> Aniket
>>
>
>

Re: Spark streaming stops computing while the receiver keeps running without any errors reported

Posted by Tathagata Das <ta...@gmail.com>.
This is very puzzling, given that this works in the local mode.

Does running the kinesis example work with your spark-submit?

https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

The instructions are present in the streaming guide.
https://github.com/apache/spark/blob/master/docs/streaming-kinesis-integration.md

If that does not work on cluster, then I would see the streaming UI for the
number records that are being received, and the stages page for whether
jobs are being executed for every batch or not. Can tell use whether that
is working well.

Also ccing, chris fregly who wrote Kinesis integration.

TD




On Thu, Sep 11, 2014 at 4:51 AM, Aniket Bhatnagar <
aniket.bhatnagar@gmail.com> wrote:

> Hi all
>
> I am trying to run kinesis spark streaming application on a standalone
> spark cluster. The job works find in local mode but when I submit it (using
> spark-submit), it doesn't do anything. I enabled logs
> for org.apache.spark.streaming.kinesis package and I regularly get the
> following in worker logs:
>
> 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
> x.x.x.x:b88a9210-cbb9-4c31-8da7-35fd92faba09 stored 34 records for shardId
> shardId-000000000000
> 14/09/11 11:41:25 DEBUG KinesisRecordProcessor: Stored:  Worker
> x.x.x.x:b2e9c20f-470a-44fe-a036-630c776919fb stored 31 records for shardId
> shardId-000000000001
>
> But the job does not perform any operations defined on DStream. To
> investigate this further, I changed the kinesis-asl's KinesisUtils to
> perform the following computation on the DStream created
> using ssc.receiverStream(new KinesisReceiver...):
>
> stream.count().foreachRDD(rdd => rdd.foreach(tuple => logInfo("Emitted " +
> tuple)))
>
> Even the above line does not results in any corresponding log entries both
> in driver and worker logs. The only relevant logs that I could find in
> driver logs are:
> 14/09/11 11:40:58 INFO DAGScheduler: Stage 2 (foreach at
> KinesisUtils.scala:68) finished in 0.398 s
> 14/09/11 11:40:58 INFO SparkContext: Job finished: foreach at
> KinesisUtils.scala:68, took 4.926449985 s
> 14/09/11 11:40:58 INFO JobScheduler: Finished job streaming job
> 1410435653000 ms.0 from job set of time 1410435653000 ms
> 14/09/11 11:40:58 INFO JobScheduler: Starting job streaming job
> 1410435653000 ms.1 from job set of time 1410435653000 ms
> 14/09/11 11:40:58 INFO SparkContext: Starting job: foreach at
> KinesisUtils.scala:68
> 14/09/11 11:40:58 INFO DAGScheduler: Registering RDD 13 (union at
> DStream.scala:489)
> 14/09/11 11:40:58 INFO DAGScheduler: Got job 3 (foreach at
> KinesisUtils.scala:68) with 2 output partitions (allowLocal=false)
> 14/09/11 11:40:58 INFO DAGScheduler: Final stage: Stage 5(foreach at
> KinesisUtils.scala:68)
>
> After the above logs, nothing shows up corresponding to KinesisUtils. I am
> out of ideas on this one and any help on this would greatly appreciated.
>
> Thanks,
> Aniket
>