You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "A.K.M. Ashrafuzzaman" <as...@gmail.com> on 2014/11/26 13:23:05 UTC

Having problem with Spark streaming with Kinesis

Hi guys,
When we are using Kinesis with 1 shard then it works fine. But when we use more that 1 then it falls into an infinite loop and no data is processed by the spark streaming. In the kinesis dynamo DB, I can see that it keeps increasing the leaseCounter. But it do start processing.

I am using,
scala: 2.10.4
java version: 1.8.0_25
Spark: 1.1.0
spark-streaming-kinesis-asl: 1.1.0

A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources


Re: Having problem with Spark streaming with Kinesis

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
I have it working without any issues (tried with 5 shrads), except my java
version was 1.7.

Here's the piece of code that i used.

      System.setProperty("AWS_ACCESS_KEY_ID",
this.kConf.getOrElse("access_key", ""))
System.setProperty("AWS_SECRET_KEY", this.kConf.getOrElse("secret",
""))      val streamName = this.kConf.getOrElse("stream", "")      val
endpointUrl = this.kConf.getOrElse("end_point","https://kinesis.us-east-1.amazonaws.com/")
     val kinesisClient = new AmazonKinesisClient(new
DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpointUrl)      *val numShards =
kinesisClient.describeStream(streamName).getStreamDescription().getShards()
        .size()*      val numStreams = numShards      val
kinesisCheckpointInterval = Seconds(this.kConf.getOrElse("duration",
"").toInt)      val kinesisStreams = (0 until numStreams).map { i =>
     KinesisUtils.createStream(ssc, streamName, endpointUrl,
kinesisCheckpointInterval,          InitialPositionInStream.LATEST,
StorageLevel.MEMORY_AND_DISK_2)      }      /* Union all the streams
*/      val unionStreams = ssc.union(kinesisStreams)      val
tmp_stream = unionStreams.map(byteArray => new String(byteArray))

      tmp_stream.print()




Thanks
Best Regards

On Wed, Nov 26, 2014 at 5:53 PM, A.K.M. Ashrafuzzaman <
ashrafuzzaman.g2@gmail.com> wrote:

> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use
> more that 1 then it falls into an infinite loop and no data is processed by
> the spark streaming. In the kinesis dynamo DB, I can see that it keeps
> increasing the leaseCounter. But it do start processing.
>
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
>
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred <http://www.newscred.com/>
>
> (M) 880-175-5592433
> Twitter <https://twitter.com/ashrafuzzaman> | Blog
> <http://jitu-blog.blogspot.com/> | Facebook
> <https://www.facebook.com/ashrafuzzaman.jitu>
>
> Check out The Academy <http://newscred.com/theacademy>, your #1 source
> for free content marketing resources
>
>

Re: Having problem with Spark streaming with Kinesis

Posted by "A.K.M. Ashrafuzzaman" <as...@gmail.com>.
Guys,
In my local machine it consumes a stream of Kinesis with 3 shards. But in EC2 it does not consume from the stream. Later we found that the EC2 machine was of 2 cores and my local machine was of 4 cores. I am using a single machine and in spark standalone mode. And we got a larger machine from EC2 and now the kinesis is getting consumed.

4 cores Single machine -> works
2 cores Single machine -> does not work
2 cores 2 workers -> does not work

So my question is that do we need a cluster of (#KinesisShards + 1) workers to be able to consume from Kinesis?


A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

On Nov 27, 2014, at 10:28 AM, Aniket Bhatnagar <an...@gmail.com> wrote:

> Did you set spark master as local[*]? If so, then it means that nunber of executors is equal to number of cores of the machine. Perhaps your mac machine has more cores (certainly more than number of kinesis shards +1).
> 
> Try explicitly setting master as local[N] where N is number of kinesis shards + 1. It should then work on both the machines.
> 
> On Thu, Nov 27, 2014, 9:46 AM Ashrafuzzaman <as...@gmail.com> wrote:
> I was trying in one machine with just sbt run.
> 
> And it is working with my mac environment with the same configuration.
> 
> I used the sample code from https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
> 
> 
> val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
> kinesisClient.setEndpoint(endpointUrl)
> val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>   .size()
> 
> /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */
> val numStreams = numShards
> 
> /* Setup the and SparkConfig and StreamingContext */
> /* Spark Streaming batch interval */
> val batchInterval = Milliseconds(2000)
> val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
> val ssc = new StreamingContext(sparkConfig, batchInterval)
> 
> /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
> val kinesisCheckpointInterval = batchInterval
> 
> /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
> val kinesisStreams = (0 until numStreams).map { i =>
>   KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval,
>       InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
> }
> 
> /* Union all the streams */
> val unionStreams = ssc.union(kinesisStreams)
> 
> /* Convert each line of Array[Byte] to String, split into words, and count them */
> val words = unionStreams.flatMap(byteArray => new String(byteArray)
>   .split(" "))
> 
> /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
> 
> /* Print the first 10 wordCounts */
> wordCounts.print()
> 
> /* Start the streaming context and await termination */
> ssc.start()
> ssc.awaitTermination()
> 
> 
> 
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred
> 
> (M) 880-175-5592433
> Twitter | Blog | Facebook
> 
> Check out The Academy, your #1 source
> for free content marketing resources
> 
> On Wed, Nov 26, 2014 at 11:26 PM, Aniket Bhatnagar <an...@gmail.com> wrote:
> What's your cluster size? For streamig to work, it needs shards + 1 executors.
> 
> On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman <as...@gmail.com> wrote:
> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use more that 1 then it falls into an infinite loop and no data is processed by the spark streaming. In the kinesis dynamo DB, I can see that it keeps increasing the leaseCounter. But it do start processing.
> 
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
> 
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred
> 
> (M) 880-175-5592433
> Twitter | Blog | Facebook
> 
> Check out The Academy, your #1 source
> for free content marketing resources
> 
> 


Re: Having problem with Spark streaming with Kinesis

Posted by Aniket Bhatnagar <an...@gmail.com>.
Did you set spark master as local[*]? If so, then it means that nunber of
executors is equal to number of cores of the machine. Perhaps your mac
machine has more cores (certainly more than number of kinesis shards +1).

Try explicitly setting master as local[N] where N is number of kinesis
shards + 1. It should then work on both the machines.

On Thu, Nov 27, 2014, 9:46 AM Ashrafuzzaman <as...@gmail.com>
wrote:

> I was trying in one machine with just sbt run.
>
> And it is working with my mac environment with the same configuration.
>
> I used the sample code from
> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>
>
> val kinesisClient = new AmazonKinesisClient(new
> DefaultAWSCredentialsProviderChain())
> kinesisClient.setEndpoint(endpointUrl)
> val numShards =
> kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>   .size()
>
> /* In this example, we're going to create 1 Kinesis
> Worker/Receiver/DStream for each shard. */
> val numStreams = numShards
>
> /* Setup the and SparkConfig and StreamingContext */
> /* Spark Streaming batch interval */
> val batchInterval = Milliseconds(2000)
> val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
> val ssc = new StreamingContext(sparkConfig, batchInterval)
>
> /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
> val kinesisCheckpointInterval = batchInterval
>
> /* Create the same number of Kinesis DStreams/Receivers as Kinesis
> stream's shards */
> val kinesisStreams = (0 until numStreams).map { i =>
>   KinesisUtils.createStream(ssc, streamName, endpointUrl,
> kinesisCheckpointInterval,
>       InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
> }
>
> /* Union all the streams */
> val unionStreams = ssc.union(kinesisStreams)
>
> /* Convert each line of Array[Byte] to String, split into words, and count
> them */
> val words = unionStreams.flatMap(byteArray => new String(byteArray)
>   .split(" "))
>
> /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
>
> /* Print the first 10 wordCounts */
> wordCounts.print()
>
> /* Start the streaming context and await termination */
> ssc.start()
> ssc.awaitTermination()
>
>
>
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred <http://www.newscred.com>
>
> (M) 880-175-5592433
> Twitter <https://twitter.com/ashrafuzzaman> | Blog
> <http://jitu-blog.blogspot.com/> | Facebook
> <https://www.facebook.com/ashrafuzzaman.jitu>
>
> Check out The Academy <http://newscred.com/theacademy>, your #1 source
> for free content marketing resources
>
> On Wed, Nov 26, 2014 at 11:26 PM, Aniket Bhatnagar <
> aniket.bhatnagar@gmail.com> wrote:
>
>> What's your cluster size? For streamig to work, it needs shards + 1
>> executors.
>>
>> On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman <
>> ashrafuzzaman.g2@gmail.com> wrote:
>>
>>> Hi guys,
>>> When we are using Kinesis with 1 shard then it works fine. But when we
>>> use more that 1 then it falls into an infinite loop and no data is
>>> processed by the spark streaming. In the kinesis dynamo DB, I can see that
>>> it keeps increasing the leaseCounter. But it do start processing.
>>>
>>> I am using,
>>> scala: 2.10.4
>>> java version: 1.8.0_25
>>> Spark: 1.1.0
>>> spark-streaming-kinesis-asl: 1.1.0
>>>
>>> A.K.M. Ashrafuzzaman
>>> Lead Software Engineer
>>> NewsCred <http://www.newscred.com/>
>>>
>>> (M) 880-175-5592433
>>> Twitter <https://twitter.com/ashrafuzzaman> | Blog
>>> <http://jitu-blog.blogspot.com/> | Facebook
>>> <https://www.facebook.com/ashrafuzzaman.jitu>
>>>
>>> Check out The Academy <http://newscred.com/theacademy>, your #1 source
>>> for free content marketing resources
>>>
>>>
>

Re: Having problem with Spark streaming with Kinesis

Posted by Ashrafuzzaman <as...@gmail.com>.
I was trying in one machine with just sbt run.

And it is working with my mac environment with the same configuration.

I used the sample code from
https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala


val kinesisClient = new AmazonKinesisClient(new
DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpointUrl)
val numShards =
kinesisClient.describeStream(streamName).getStreamDescription().getShards()
  .size()

/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream
for each shard. */
val numStreams = numShards

/* Setup the and SparkConfig and StreamingContext */
/* Spark Streaming batch interval */
val batchInterval = Milliseconds(2000)
val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
val ssc = new StreamingContext(sparkConfig, batchInterval)

/* Kinesis checkpoint interval.  Same as batchInterval for this example. */
val kinesisCheckpointInterval = batchInterval

/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's
shards */
val kinesisStreams = (0 until numStreams).map { i =>
  KinesisUtils.createStream(ssc, streamName, endpointUrl,
kinesisCheckpointInterval,
      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
}

/* Union all the streams */
val unionStreams = ssc.union(kinesisStreams)

/* Convert each line of Array[Byte] to String, split into words, and count
them */
val words = unionStreams.flatMap(byteArray => new String(byteArray)
  .split(" "))

/* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

/* Print the first 10 wordCounts */
wordCounts.print()

/* Start the streaming context and await termination */
ssc.start()
ssc.awaitTermination()



A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred <http://www.newscred.com>

(M) 880-175-5592433
Twitter <https://twitter.com/ashrafuzzaman> | Blog
<http://jitu-blog.blogspot.com/> | Facebook
<https://www.facebook.com/ashrafuzzaman.jitu>

Check out The Academy <http://newscred.com/theacademy>, your #1 source
for free content marketing resources

On Wed, Nov 26, 2014 at 11:26 PM, Aniket Bhatnagar <
aniket.bhatnagar@gmail.com> wrote:

> What's your cluster size? For streamig to work, it needs shards + 1
> executors.
>
> On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman <
> ashrafuzzaman.g2@gmail.com> wrote:
>
>> Hi guys,
>> When we are using Kinesis with 1 shard then it works fine. But when we
>> use more that 1 then it falls into an infinite loop and no data is
>> processed by the spark streaming. In the kinesis dynamo DB, I can see that
>> it keeps increasing the leaseCounter. But it do start processing.
>>
>> I am using,
>> scala: 2.10.4
>> java version: 1.8.0_25
>> Spark: 1.1.0
>> spark-streaming-kinesis-asl: 1.1.0
>>
>> A.K.M. Ashrafuzzaman
>> Lead Software Engineer
>> NewsCred <http://www.newscred.com/>
>>
>> (M) 880-175-5592433
>> Twitter <https://twitter.com/ashrafuzzaman> | Blog
>> <http://jitu-blog.blogspot.com/> | Facebook
>> <https://www.facebook.com/ashrafuzzaman.jitu>
>>
>> Check out The Academy <http://newscred.com/theacademy>, your #1 source
>> for free content marketing resources
>>
>>

Re: Having problem with Spark streaming with Kinesis

Posted by Aniket Bhatnagar <an...@gmail.com>.
What's your cluster size? For streamig to work, it needs shards + 1
executors.

On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman <
ashrafuzzaman.g2@gmail.com> wrote:

> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use
> more that 1 then it falls into an infinite loop and no data is processed by
> the spark streaming. In the kinesis dynamo DB, I can see that it keeps
> increasing the leaseCounter. But it do start processing.
>
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
>
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred <http://www.newscred.com/>
>
> (M) 880-175-5592433
> Twitter <https://twitter.com/ashrafuzzaman> | Blog
> <http://jitu-blog.blogspot.com/> | Facebook
> <https://www.facebook.com/ashrafuzzaman.jitu>
>
> Check out The Academy <http://newscred.com/theacademy>, your #1 source
> for free content marketing resources
>
>

Re: Having problem with Spark streaming with Kinesis

Posted by Ashrafuzzaman <as...@gmail.com>.
Thanks Aniket , clears a lot of confusion. 😄
On Dec 14, 2014 7:11 PM, "Aniket Bhatnagar" <an...@gmail.com>
wrote:

> The reason is because of the following code:
>
> val numStreams = numShards
> val kinesisStreams = (0 until numStreams).map { i =>
>   KinesisUtils.createStream(ssc, streamName, endpointUrl,
> kinesisCheckpointInterval,
>       InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
> }
>
> In the above code, numStreams is set as numShards. This enforces the need
> to have #shards + 1 workers. If you set numStreams as Math.min(numShards,
> numAvailableWorkers - 1), you can have lesser number of workers than number
> of shards. Makes sense?
>
> On Sun Dec 14 2014 at 10:06:36 A.K.M. Ashrafuzzaman <
> ashrafuzzaman.g2@gmail.com> wrote:
>
>> Thanks Aniket,
>> The trick is to have the #workers >= #shards + 1. But I don’t know why is
>> that.
>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>
>> Here in the figure[spark streaming kinesis architecture], it seems like
>> one node should be able to take on more than one shards.
>>
>>
>> A.K.M. Ashrafuzzaman
>> Lead Software Engineer
>> NewsCred <http://www.newscred.com/>
>>
>> (M) 880-175-5592433
>> Twitter <https://twitter.com/ashrafuzzaman> | Blog
>> <http://jitu-blog.blogspot.com/> | Facebook
>> <https://www.facebook.com/ashrafuzzaman.jitu>
>>
>> Check out The Academy <http://newscred.com/theacademy>, your #1 source
>> for free content marketing resources
>>
>> On Nov 26, 2014, at 6:23 PM, A.K.M. Ashrafuzzaman <
>> ashrafuzzaman.g2@gmail.com> wrote:
>>
>> Hi guys,
>> When we are using Kinesis with 1 shard then it works fine. But when we
>> use more that 1 then it falls into an infinite loop and no data is
>> processed by the spark streaming. In the kinesis dynamo DB, I can see that
>> it keeps increasing the leaseCounter. But it do start processing.
>>
>> I am using,
>> scala: 2.10.4
>> java version: 1.8.0_25
>> Spark: 1.1.0
>> spark-streaming-kinesis-asl: 1.1.0
>>
>> A.K.M. Ashrafuzzaman
>> Lead Software Engineer
>> NewsCred <http://www.newscred.com/>
>>
>> (M) 880-175-5592433
>> Twitter <https://twitter.com/ashrafuzzaman> | Blog
>> <http://jitu-blog.blogspot.com/> | Facebook
>> <https://www.facebook.com/ashrafuzzaman.jitu>
>>
>> Check out The Academy <http://newscred.com/theacademy>, your #1 source
>> for free content marketing resources
>>
>>
>>

Re: Having problem with Spark streaming with Kinesis

Posted by Aniket Bhatnagar <an...@gmail.com>.
The reason is because of the following code:

val numStreams = numShards
val kinesisStreams = (0 until numStreams).map { i =>
  KinesisUtils.createStream(ssc, streamName, endpointUrl,
kinesisCheckpointInterval,
      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
}

In the above code, numStreams is set as numShards. This enforces the need
to have #shards + 1 workers. If you set numStreams as Math.min(numShards,
numAvailableWorkers - 1), you can have lesser number of workers than number
of shards. Makes sense?

On Sun Dec 14 2014 at 10:06:36 A.K.M. Ashrafuzzaman <
ashrafuzzaman.g2@gmail.com> wrote:

> Thanks Aniket,
> The trick is to have the #workers >= #shards + 1. But I don’t know why is
> that.
> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>
> Here in the figure[spark streaming kinesis architecture], it seems like
> one node should be able to take on more than one shards.
>
>
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred <http://www.newscred.com/>
>
> (M) 880-175-5592433
> Twitter <https://twitter.com/ashrafuzzaman> | Blog
> <http://jitu-blog.blogspot.com/> | Facebook
> <https://www.facebook.com/ashrafuzzaman.jitu>
>
> Check out The Academy <http://newscred.com/theacademy>, your #1 source
> for free content marketing resources
>
> On Nov 26, 2014, at 6:23 PM, A.K.M. Ashrafuzzaman <
> ashrafuzzaman.g2@gmail.com> wrote:
>
> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use
> more that 1 then it falls into an infinite loop and no data is processed by
> the spark streaming. In the kinesis dynamo DB, I can see that it keeps
> increasing the leaseCounter. But it do start processing.
>
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
>
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred <http://www.newscred.com/>
>
> (M) 880-175-5592433
> Twitter <https://twitter.com/ashrafuzzaman> | Blog
> <http://jitu-blog.blogspot.com/> | Facebook
> <https://www.facebook.com/ashrafuzzaman.jitu>
>
> Check out The Academy <http://newscred.com/theacademy>, your #1 source
> for free content marketing resources
>
>
>

Re: Having problem with Spark streaming with Kinesis

Posted by "A.K.M. Ashrafuzzaman" <as...@gmail.com>.
Thanks Aniket,
The trick is to have the #workers >= #shards + 1. But I don’t know why is that.
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html

Here in the figure[spark streaming kinesis architecture], it seems like one node should be able to take on more than one shards.


A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

On Nov 26, 2014, at 6:23 PM, A.K.M. Ashrafuzzaman <as...@gmail.com> wrote:

> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use more that 1 then it falls into an infinite loop and no data is processed by the spark streaming. In the kinesis dynamo DB, I can see that it keeps increasing the leaseCounter. But it do start processing.
> 
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
> 
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred
> 
> (M) 880-175-5592433
> Twitter | Blog | Facebook
> 
> Check out The Academy, your #1 source
> for free content marketing resources
>