You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Karthikeyan Ravi (Jira)" <ji...@apache.org> on 2019/09/16 19:29:00 UTC

[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

     [ https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Karthikeyan Ravi updated SPARK-25721:
-------------------------------------
    Attachment: Screen Shot 2019-09-16 at 12.27.25 PM.png

> maxRate configuration not being used in Kinesis receiver
> --------------------------------------------------------
>
>                 Key: SPARK-25721
>                 URL: https://issues.apache.org/jira/browse/SPARK-25721
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.2.0
>            Reporter: Zhaobo Yu
>            Priority: Major
>         Attachments: Screen Shot 2019-09-16 at 12.27.25 PM.png, rate_violation.png
>
>
> In the onStart() function of KinesisReceiver class, the KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so KinesisClientLibConfiguration will set it to 10000 by default since it has been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 10000;
> In such a case, the receiver will not fulfill any maxRate setting we set if it's less than 10k, worse still, it will cause ProvisionedThroughputExceededException from Kinesis, especially when we restart the streaming application. 
>  
> Attached  rate_violation.png, we have a spark streaming application that has 40 receivers, which is set to consume 1 record per second. Within 5 minutes the spark streaming application should take no more than 12k records/5 minutes (40*60*5 = 12k), but cloudwatch metrics shows it was consuming more than that, which is almost at the rate of 22k records/5 minutes.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org