You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zhaobo Yu (JIRA)" <ji...@apache.org> on 2018/10/12 23:05:00 UTC

[jira] [Updated] (SPARK-25721) maxRate configuration not being used in Kinesis receiver

     [ https://issues.apache.org/jira/browse/SPARK-25721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zhaobo Yu updated SPARK-25721:
------------------------------
    Description: 
In the onStart() function of KinesisReceiver class, the KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so KinesisClientLibConfiguration will set it to 10000 by default since it has been hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 10000;

In such a case, the receiver will not fulfill any maxRate setting we set, worse still, it will cause ProvisionedThroughputExceededException from Kinesis, especially when we restart the streaming application. 

 

Attached  

  was:
In the onStart() function of KinesisReceiver class, the KinesisClientLibConfiguration object is initialized in the following way, 

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
 checkpointAppName,
 streamName,
 kinesisProvider,
 dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
 cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
 workerId)
 .withKinesisEndpoint(endpointUrl)
 .withInitialPositionInStream(initialPositionInStream)
 .withTaskBackoffTimeMillis(500)
 .withRegionName(regionName)

 

As you can see there is no withMaxRecords() in initialization, so KinesisClientLibConfiguration will set it to 10000 by default since it has been hard coded as this way,

public static final int DEFAULT_MAX_RECORDS = 10000;

In such a case, the receiver will not fulfill any maxRate setting we set, worse still, it will cause ProvisionedThroughputExceededException from Kinesis, especially when we restart the streaming application. 

 


> maxRate configuration not being used in Kinesis receiver
> --------------------------------------------------------
>
>                 Key: SPARK-25721
>                 URL: https://issues.apache.org/jira/browse/SPARK-25721
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.2.0
>            Reporter: Zhaobo Yu
>            Priority: Major
>
> In the onStart() function of KinesisReceiver class, the KinesisClientLibConfiguration object is initialized in the following way, 
> val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
>  checkpointAppName,
>  streamName,
>  kinesisProvider,
>  dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
>  cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
>  workerId)
>  .withKinesisEndpoint(endpointUrl)
>  .withInitialPositionInStream(initialPositionInStream)
>  .withTaskBackoffTimeMillis(500)
>  .withRegionName(regionName)
>  
> As you can see there is no withMaxRecords() in initialization, so KinesisClientLibConfiguration will set it to 10000 by default since it has been hard coded as this way,
> public static final int DEFAULT_MAX_RECORDS = 10000;
> In such a case, the receiver will not fulfill any maxRate setting we set, worse still, it will cause ProvisionedThroughputExceededException from Kinesis, especially when we restart the streaming application. 
>  
> Attached  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org