You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Yash Sharma <ya...@gmail.com> on 2016/02/23 02:58:41 UTC

Re: Spark not able to fetch events from Amazon Kinesis

Answering my own Question -

I have got some success with Spark Kinesis integration, and the key being
the unionStreams.foreachRDD.

There are 2 versions of the foreachRDD available
- unionStreams.foreachRDD
- unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time)

For some reason the first one is not able to get me the results but
changing to the second one fetches me the results as expected. Yet to
explore the reason.

Adding a code snippet below for reference.

Hope it helps someone :)

Thanks everyone for help.

<code>
> val kinesisStreams = (0 until numStreams).map {
>   count =>
>     val stream = KinesisUtils.createStream(
>       ssc,
>       consumerName,
>       streamName,
>       endpointUrl,
>       regionName,
>       InitialPositionInStream.TRIM_HORIZON,
>       kinesisCheckpointInterval,
>       StorageLevel.MEMORY_AND_DISK_2
>     )
>
>     stream
> }
> val unionStreams = ssc.union(kinesisStreams)
>
> println(s"========================")
> println(s"Num of streams: ${numStreams}")
> println(s"========================")
>
> /*unionStreams.foreachRDD{ // Doesn't Work !!
>   rdd =>
>     println(rdd.count)
>     println("rdd isempty:" + rdd.isEmpty)
> }*/

unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works,
> Yeah !!
>   println(rdd.count)
>   println("rdd isempty:" + rdd.isEmpty)
>   }
> )
>
> ssc.start()
> ssc.awaitTermination()
>
> </code>


On Sun, Jan 31, 2016 at 12:11 PM, Yash Sharma <ya...@gmail.com> wrote:
>
> Thanks Burak,
> By any chance were you able to work around these errors or get the setup
working ? Is there anything else that you might have tried ?
>
> Regards
>
> On Sun, Jan 31, 2016 at 4:41 AM, Burak Yavuz <br...@gmail.com> wrote:
>>
>> Hi Yash,
>>
>> I've run into multiple problems due to version incompatibilities, either
due to protobuf or jackson. That may be your culprit. The problem is that
all failures by the Kinesis Client Lib is silent, therefore don't show up
on the logs. It's very hard to debug those buggers.
>>
>> Best,
>> Burak
>>
>> On Sat, Jan 30, 2016 at 5:36 AM, Yash Sharma <ya...@gmail.com> wrote:
>>>
>>> Thanks Ted, Rebuilding would not be possible for the setup
unfortunately so just wanted to check if the version mismatch is the
primary issue here. Wanted to know if anyone has hit across similar issue
and how they have solved this.
>>>
>>> Thanks
>>>
>>> On Sat, Jan 30, 2016 at 10:23 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>> w.r.t. protobuf-java version mismatch, I wonder if you can rebuild
Spark with the following change (using maven):
>>>>
>>>> http://pastebin.com/fVQAYWHM
>>>>
>>>> Cheers
>>>>
>>>> On Sat, Jan 30, 2016 at 12:49 AM, Yash Sharma <ya...@gmail.com>
wrote:
>>>>>
>>>>> Hi All,
>>>>> I have a quick question if anyone has experienced this here.
>>>>>
>>>>> I have been trying to get Spark read events from Kinesis recently but
am having problem in receiving the events. While Spark is able to connect
to Kinesis and is able to get metadata from Kinesis, Its not able to get
events from it. It always fetches zero elements back.
>>>>>
>>>>> There are no errors, just empty results back. Spark is able to get
metadata (Eg. number of shards in kinesis etc).
>>>>>
>>>>> I have used these [1 & 2] guides for getting it working but have not
got much luck yet. I have also tried couple of suggestions from SO [3]. The
cluster has sufficient resources/cores available.
>>>>>
>>>>> We have seen a version conflict in Protobuf Version between Spark and
Kinesis which could also be a cause for this behavior. Spark uses
protobuf-java version 2.5.0 and kinesis probably uses
protobuf-java-2.6.1.jar.
>>>>>
>>>>> Just wondered if anyone has come across this behavior or, has got
spark working with kinesis.
>>>>>
>>>>> Have tried with Spark 1.5.0, Spark 1.6.0.
>>>>>
>>>>> Appreciate any pointers.
>>>>>
>>>>> Best Regards,
>>>>> Yash
>>>>>
>>>>> 1.
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>>> 2.
https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>>>>>
>>>>> 3.
http://stackoverflow.com/questions/26941844/apache-spark-kinesis-sample-not-working
>>>>>
>>>>
>>>
>>
>