You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by yssharma <gi...@git.apache.org> on 2017/03/29 12:22:35 UTC

[GitHub] spark pull request #17467: Ysharma/spark kinesis retries

GitHub user yssharma opened a pull request:

    https://github.com/apache/spark/pull/17467

    Ysharma/spark kinesis retries

    ## What changes were proposed in this pull request?
    
    The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES.
    
    This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge.
    Following happens in a typical kinesis recovery :
    - kinesis throttles large number of requests while recovering
    - retries in case of throttling are not able to recover due to the small wait period
    - kinesis throttles per second, the wait period should be configurable for recovery
    
    The patch picks the spark kinesis configs from:
    - spark.streaming.kinesis.retry.wait.time
    - spark.streaming.kinesis.retry.max.attempts
    
    Jira : https://issues.apache.org/jira/browse/SPARK-20140
    
    ## How was this patch tested?
    
    Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yssharma/spark ysharma/spark-kinesis-retries

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17467.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17467
    
----
commit 67306cf76455c0ac080357d7aa7dbc4a5644896e
Author: Yash Sharma <ys...@atlassian.com>
Date:   2017-03-29T09:43:56Z

    Remove hardcoded retries for kinesis backed block rdd

commit 3aabde82e13de61ad5ec63b491854fb2576e97cc
Author: Yash Sharma <ys...@atlassian.com>
Date:   2017-03-29T12:16:32Z

    add testcase with the modified configurations

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112816822
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -295,6 +306,23 @@ class KinesisSequenceRangeIterator(
     
     private[streaming]
     object KinesisSequenceRangeIterator {
    -  val MAX_RETRIES = 3
    -  val MIN_RETRY_WAIT_TIME_MS = 100
    +  /**
    +   * The maximum number of attempts to be made to kinesis. Defaults to 3.
    +   */
    +  val MAX_RETRIES = "3"
    +
    +  /**
    +   * The interval between consequent kinesis retries. Defaults to 100ms.
    --- End diff --
    
    *nit:* **K**inesis


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r110437376
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -251,21 +255,22 @@ class KinesisSequenceRangeIterator(
     
       /** Helper method to retry Kinesis API request with exponential backoff and timeouts */
       private def retryOrTimeout[T](message: String)(body: => T): T = {
    -    import KinesisSequenceRangeIterator._
    -
    -    var startTimeMs = System.currentTimeMillis()
    +    val startTimeMs = System.currentTimeMillis()
         var retryCount = 0
    -    var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
    +    var kinesisWaitTimeMs =
    +      kinesisConfigs.getOrElse("spark.streaming.kinesis.retry.wait.time", "100").toInt
    --- End diff --
    
    define this once during class initialization as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: Ysharma/spark kinesis retries

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Hi @shaynativ, this PR looks a non-trivial change that needs a JIRA. Please refer http://spark.apache.org/contributing.html.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r115039620
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala ---
    @@ -60,12 +61,19 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
           val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
           logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
               s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
    +
    +      /**
    +       * Construct the Kinesis read configs from streaming context
    +       * and pass to KinesisBackedBlockRDD
    +       */
    +      val kinesisReadConfigs = KinesisReadConfigurations(ssc)
    +
           new KinesisBackedBlockRDD(
             context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
             isBlockIdValid = isBlockIdValid,
    -        retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
             messageHandler = messageHandler,
    -        kinesisCreds = kinesisCreds)
    +        kinesisCreds = kinesisCreds,
    +        kinesisReadConfigs = kinesisReadConfigs)
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114855848
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @budde @brkyvz - Any feed back on this one please ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112765206
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -17,21 +17,24 @@
     
     package org.apache.spark.streaming.kinesis
     
    -import scala.collection.JavaConverters._
    -import scala.reflect.ClassTag
    -import scala.util.control.NonFatal
    -
    -import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
    +import com.amazonaws.auth.AWSCredentials
     import com.amazonaws.services.kinesis.AmazonKinesisClient
     import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
     import com.amazonaws.services.kinesis.model._
    -
    --- End diff --
    
    I think this newline should be kept to be consistent with the project's scalastyle. Have you been running style checks when testing this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r110437838
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -83,7 +83,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
         @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
         val retryTimeoutMs: Int = 10000,
         val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _,
    -    val kinesisCreds: SparkAWSCredentials = DefaultCredentials
    +    val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
    +    val kinesisConf: Map[String, String] = Map.empty
    --- End diff --
    
    do you need this to be provided as a `Map`? You already have `sc` being passed in with all the configurations


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    **[Test build #3634 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3634/testReport)** for PR 17467 at commit [`ccb6c19`](https://github.com/apache/spark/commit/ccb6c19dfaec4b7667552dfeed3da730923b7b64).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112346805
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -147,6 +152,14 @@ class KinesisSequenceRangeIterator(
       private var lastSeqNumber: String = null
       private var internalIterator: Iterator[Record] = null
     
    +  // variable for kinesis wait time interval between next retry
    +  private var kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
    --- End diff --
    
    No, this value is modified after waits -
    `kinesisWaitTimeMs *= 2  // if you have waited, then double wait time for next round`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114856307
  
    --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---
    @@ -234,6 +235,53 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
         ssc.stop(stopSparkContext = false)
       }
     
    +  test("Kinesis read with custom configurations") {
    +    try {
    +      ssc.sc.conf.set(RETRY_WAIT_TIME_KEY, "2000ms")
    +      ssc.sc.conf.set(RETRY_MAX_ATTEMPTS_KEY, "5")
    +
    +      val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
    +      .checkpointAppName(appName)
    +      .streamName("dummyStream")
    +      .endpointUrl(dummyEndpointUrl)
    +      .regionName(dummyRegionName)
    +      .initialPositionInStream(InitialPositionInStream.LATEST)
    +      .checkpointInterval(Seconds(10))
    +      .storageLevel(StorageLevel.MEMORY_ONLY)
    +      .build()
    +      .asInstanceOf[KinesisInputDStream[Array[Byte]]]
    +
    +      val time = Time(1000)
    +      // Generate block info data for testing
    +      val seqNumRanges1 = SequenceNumberRanges(
    +        SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
    +      val blockId1 = StreamBlockId(kinesisStream.id, 123)
    +      val blockInfo1 = ReceivedBlockInfo(
    +        0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
    +
    +      val seqNumRanges2 = SequenceNumberRanges(
    +        SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
    +      val blockId2 = StreamBlockId(kinesisStream.id, 345)
    +      val blockInfo2 = ReceivedBlockInfo(
    +        0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
    +
    +      // Verify that the generated KinesisBackedBlockRDD has the all the right information
    +      val blockInfos = Seq(blockInfo1, blockInfo2)
    +
    +      val kinesisRDD =
    +        kinesisStream.createBlockRDD(time, blockInfos).asInstanceOf[KinesisBackedBlockRDD[_]]
    +
    +      assert(kinesisRDD.kinesisReadConfigs.retryWaitTimeMs === 2000)
    +      assert(kinesisRDD.kinesisReadConfigs.maxRetries === 5)
    +      assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === batchDuration.milliseconds)
    +
    +      ssc.sc.conf.remove(RETRY_WAIT_TIME_KEY)
    --- End diff --
    
    these also need to be in the `finally`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @budde would you like to share your thoughts on the new changes when you have time ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112839963
  
    --- Diff: docs/streaming-kinesis-integration.md ---
    @@ -216,3 +216,7 @@ de-aggregate records during consumption.
     - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`).  This is configurable.
       - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
       - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
    +
    +#### Kinesis retry configurations
    --- End diff --
    
    *nit:* think "configuration" sounds more natural here than "configurations"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @budde @brkyvz - Implemented the review changes. Please review.
    
    - Using SparkConf for all the user parameters
    - removed kinesisWait to be val instead of var
    - fixed documentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112614624
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -83,7 +83,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
         @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
         val retryTimeoutMs: Int = 10000,
         val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _,
    -    val kinesisCreds: SparkAWSCredentials = DefaultCredentials
    +    val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
    +    val kinesisConf: Map[String, String] = Map.empty
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @srowen - Could I get some love here as well. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: Ysharma/spark kinesis retries

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Oh right you meant the PR title format. I will reject this pr and post a new one. Thanks @HyukjinKwon 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114584052
  
    --- Diff: docs/streaming-kinesis-integration.md ---
    @@ -216,3 +216,7 @@ de-aggregate records during consumption.
     - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`).  This is configurable.
       - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
       - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
    +
    +#### Kinesis retry configuration
    + - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit 'ThroughputExceededExceptions', when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms".
    --- End diff --
    
    `ProvisionedThroughputExceededException`s


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114586962
  
    --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---
    @@ -234,6 +235,50 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
         ssc.stop(stopSparkContext = false)
       }
     
    +  test("Kinesis read with custom configurations") {
    +    ssc.sc.conf.set(RETRY_WAIT_TIME_KEY, "2000ms")
    +    ssc.sc.conf.set(RETRY_MAX_ATTEMPTS_KEY, "5")
    +
    +    val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
    +    .checkpointAppName(appName)
    +    .streamName("dummyStream")
    +    .endpointUrl(dummyEndpointUrl)
    +    .regionName(dummyRegionName)
    +    .initialPositionInStream(InitialPositionInStream.LATEST)
    +    .checkpointInterval(Seconds(10))
    +    .storageLevel(StorageLevel.MEMORY_ONLY)
    +    .build()
    +    .asInstanceOf[KinesisInputDStream[Array[Byte]]]
    +
    +    val time = Time(1000)
    +    // Generate block info data for testing
    +    val seqNumRanges1 = SequenceNumberRanges(
    +      SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
    +    val blockId1 = StreamBlockId(kinesisStream.id, 123)
    +    val blockInfo1 = ReceivedBlockInfo(
    +      0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
    +
    +    val seqNumRanges2 = SequenceNumberRanges(
    +      SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
    +    val blockId2 = StreamBlockId(kinesisStream.id, 345)
    +    val blockInfo2 = ReceivedBlockInfo(
    +      0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
    +
    +    // Verify that the generated KinesisBackedBlockRDD has the all the right information
    +    val blockInfos = Seq(blockInfo1, blockInfo2)
    +
    +    val kinesisRDD =
    +      kinesisStream.createBlockRDD(time, blockInfos).asInstanceOf[KinesisBackedBlockRDD[_]]
    +
    +    assert(kinesisRDD.kinesisReadConfigs.retryWaitTimeMs === 2000)
    +    assert(kinesisRDD.kinesisReadConfigs.maxRetries === 5)
    +    assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === batchDuration.milliseconds)
    +
    +    ssc.sc.conf.remove(RETRY_WAIT_TIME_KEY)
    --- End diff --
    
    mind putting these in a `try - finally` block


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114851396
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    --- End diff --
    
    *nit*: should be ```[[KinesisBackedBlockRDD]]```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r110437471
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -292,9 +297,3 @@ class KinesisSequenceRangeIterator(
         }
       }
     }
    -
    -private[streaming]
    -object KinesisSequenceRangeIterator {
    --- End diff --
    
    keep the default values here please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @budde - Added changes for the minor review comments and docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: Ysharma/spark kinesis retries

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Hi @yssharma, this PR looks a non-trivial change that needs a JIRA. Please refer http://spark.apache.org/contributing.html.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    I really prefer the case class. It's used in places where we don't pass in `SparkContext` as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112848762
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
         endpointUrl: String,
         regionId: String,
         range: SequenceNumberRange,
    -    retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
    +    retryTimeoutMs: Int,
    +    sparkConf: SparkConf) extends NextIterator[Record] with Logging {
    --- End diff --
    
    And would you expect it to be passed directly to the`KinesisInputDStream` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Thanks for the comments @budde @brkyvz . Would be adding the changes soon.
    I too liked pulling the values of spark conf directly and got it working with the private val in `KinesisBackedBlockRDD` [1]. I don't mind getting the conf from the case class either since it keeps all the configs in a place and the class acts as a self documented code. Open to thoughts from you both.
    
    1. https://github.com/yssharma/spark/commit/f5026b4fb1bb98a0d31cfaa6571eee896051aa2b


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112848373
  
    --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala ---
    @@ -101,6 +103,36 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
         }
       }
     
    +  testIfEnabled("Basic reading from Kinesis with modified configurations") {
    +    // Add Kinesis retry configurations
    +    sc.conf.set(RETRY_WAIT_TIME_KEY, "1000ms")
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Can I get some feedback here please @tdas @brkyvz Thanks :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114856216
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    + *
    + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
    + * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
    + *                         Defaults to 100ms.
    + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
    +*                         Defaults to batch duration provided for streaming,
    +*                         else uses 10000 if invoked directly.
    + */
    +private[kinesis] case class KinesisReadConfigurations(
    +                                                     maxRetries: Int,
    +                                                     retryWaitTimeMs: Long,
    +                                                     retryTimeoutMs: Long)
    +
    +object KinesisReadConfigurations {
    --- End diff --
    
    `private object`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112816922
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -295,6 +306,23 @@ class KinesisSequenceRangeIterator(
     
     private[streaming]
     object KinesisSequenceRangeIterator {
    -  val MAX_RETRIES = 3
    -  val MIN_RETRY_WAIT_TIME_MS = 100
    +  /**
    +   * The maximum number of attempts to be made to kinesis. Defaults to 3.
    +   */
    +  val MAX_RETRIES = "3"
    +
    +  /**
    +   * The interval between consequent kinesis retries. Defaults to 100ms.
    +   */
    +  val MIN_RETRY_WAIT_TIME_MS = "100ms"
    +
    +  /**
    +   * Key for configuring the retry wait time for kinesis. The values can be passed to SparkConf.
    +   */
    +  val RETRY_WAIT_TIME_KEY = "spark.streaming.kinesis.retry.waitTime"
    +
    +  /**
    +   * Key for configuring the number of retries for kinesis. The values can be passed to SparkConf.
    --- End diff --
    
    *nit:* I'd make the following tweaks here:
    
    ```scala
    /**
     * SparkConf key for configuring the maximum number of retries used when attempting a Kinesis
     * request.
     */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114584488
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -284,17 +282,12 @@ class KinesisSequenceRangeIterator(
         result.getOrElse {
           if (isTimedOut) {
             throw new SparkException(
    -          s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError)
    +          s"Timed out after ${kinesisReadConfigs.retryTimeoutMs} ms while "
    +          + "$message, last exception: ", lastError)
    --- End diff --
    
    also please move `+` to line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    You're a gen @HyukjinKwon \U0001f4af . I will wait for Tathagata and Burak's inputs then :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112848363
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
         endpointUrl: String,
         regionId: String,
         range: SequenceNumberRange,
    -    retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
    +    retryTimeoutMs: Int,
    +    sparkConf: SparkConf) extends NextIterator[Record] with Logging {
    --- End diff --
    
    @brkyvz - I was thinking not to pass individual configs to the constructor because that would just cause the list to grow. Using SparkConf or a Map would enable us to add new configs without any code changes. I was using a Map earlier for this so that its easy to pass more configs. 
    What are your thoughts on Map vs Case class ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r110437682
  
    --- Diff: docs/streaming-kinesis-integration.md ---
    @@ -216,3 +216,7 @@ de-aggregate records during consumption.
     - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`).  This is configurable.
       - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
       - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
    +
    +- Kinesis retry configurations
    + - `spark.streaming.kinesis.retry.wait.time` : Config for wait time between Kinesis retries (in milliseconds). Default is 100 ms.
    --- End diff --
    
    We can take duration strings, such as `100ms` or `1s`, I would prefer we use that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114856082
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    + *
    + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
    + * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
    + *                         Defaults to 100ms.
    + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
    +*                         Defaults to batch duration provided for streaming,
    +*                         else uses 10000 if invoked directly.
    + */
    +private[kinesis] case class KinesisReadConfigurations(
    +                                                     maxRetries: Int,
    +                                                     retryWaitTimeMs: Long,
    +                                                     retryTimeoutMs: Long)
    +
    +object KinesisReadConfigurations {
    +  def apply(): KinesisReadConfigurations = {
    --- End diff --
    
    actually do we even need this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    LGTM! Merging to master/branch-2.2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112841727
  
    --- Diff: docs/streaming-kinesis-integration.md ---
    @@ -216,3 +216,7 @@ de-aggregate records during consumption.
     - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`).  This is configurable.
       - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
       - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
    +
    +#### Kinesis retry configurations
    + - `spark.streaming.kinesis.retry.waitTime` : SparkConf for wait time between Kinesis retries (in milliseconds). Default is "100ms".
    --- End diff --
    
    Example: `Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit 'ThroughputExceededExceptions', when consuming faster than 2 mb/s. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114851170
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    + *
    + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
    + * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
    + *                         Defaults to 100ms.
    + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
    +*                         Defaults to batch duration provided for streaming,
    +*                         else uses 10000 if invoked directly.
    + */
    +private[kinesis] case class KinesisReadConfigurations(
    +                                                     maxRetries: Int,
    +                                                     retryWaitTimeMs: Long,
    +                                                     retryTimeoutMs: Long)
    +
    +object KinesisReadConfigurations {
    +  def apply(): KinesisReadConfigurations = {
    +    KinesisReadConfigurations(3, 100, 10000)
    --- End diff --
    
    I would use constants and named parameters here too, e.g.
    
    ```scala
    def apply(): KinesisReadConfigurations = KinesisReadConfigurations(
      maxTretries = DEFAULT_MAX_RETRIES,
      ...
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114586572
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    + *
    + * @param maxRetriesOption : The maximum number of attempts to be made to Kinesis. Defaults to 3.
    + * @param retryWaitTimeMsOption : The interval between consequent Kinesis retries.
    + *                              Defaults to 100ms.
    + * @param retryTimeoutMsOption : The timeout in milliseconds for a Kinesis request.
    + *                             Defaults to batch duration provided for streaming,
    + *                             else uses 10000 if invoked directly.
    + */
    +private[kinesis]
    +case class KinesisReadConfigurations (
    --- End diff --
    
    How about something like this?
    ```scala
    private[kinesis] case class KinesisReadConfigurations(
        maxRetries: Int,
        retryWaitTimeMs: Long,
        retryTimeoutMs: Long)
    
    object KinesisReadConfigurations {
      def apply(): KinesisReadConfigurations = {
        KinesisReadConfigurations(3, 100, 10000)
      }
    
      def apply(ssc: StreamingContext): KinesisReadConfigurations = {
        KinesisReadConfigurations(
          maxRetries = ssc.sc.getConf.getInt(RETRY_MAX_ATTEMPTS_KEY, DEFAULT_MAX_RETRIES),
          retryWaitTimeMs = JavaUtils.timeStringAsMs(
            ssc.sc.getConf.getString(RETRY_WAIT_TIME_KEY, DEFAULT_RETRY_WAIT_TIME)),
          retryTimeoutMs = ssc.graph.batchDuration.milliseconds)
      }
    
      val DEFAULT_MAX_RETRIES = 3
      val DEFAULT_RETRY_WAIT_TIME = "100ms"
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Thanks for all the review comments  @budde @brkyvz . Added new review changes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Not a Spark committer, but I've contributed to this component in the past. I would strongly prefer an approach that avoids adding an additional parameter to all of the Kinesis classes if the ```SparkConf``` from ```sc``` can be used instead.  I haven't looked at the initial version of your code, but based on the stacktrace you've posted it seems like you might've been referencing ```sc``` directly from code running running on the executor (e.g. the ```KinesisSequenceRangeIterator``` instance created in the ```compute()``` method via ```getBlockFromKinesis()```). Did you try simply extracting the two config values from ```sc``` and storing them as fields for ```KinesisBackedBlockRDDPartition``` then passing them as constructor arguments to ```KinesisSequenceRangeIterator```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114922000
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala ---
    @@ -60,12 +61,19 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
           val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
           logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
               s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
    +
    +      /**
    +       * Construct the Kinesis read configs from streaming context
    +       * and pass to KinesisBackedBlockRDD
    +       */
    +      val kinesisReadConfigs = KinesisReadConfigurations(ssc)
    +
           new KinesisBackedBlockRDD(
             context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
             isBlockIdValid = isBlockIdValid,
    -        retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
             messageHandler = messageHandler,
    -        kinesisCreds = kinesisCreds)
    +        kinesisCreds = kinesisCreds,
    +        kinesisReadConfigs = kinesisReadConfigs)
    --- End diff --
    
    I think it would be sufficient to change this to
    
    ```scala
      kinesisReadConfigs = KinesisReadConfigurations(ssc))
    ```
    
    and omit lines 65-70. I don't think a comment is necessary here, the code is pretty straightforward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r110437249
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -251,21 +255,22 @@ class KinesisSequenceRangeIterator(
     
       /** Helper method to retry Kinesis API request with exponential backoff and timeouts */
       private def retryOrTimeout[T](message: String)(body: => T): T = {
    -    import KinesisSequenceRangeIterator._
    -
    -    var startTimeMs = System.currentTimeMillis()
    +    val startTimeMs = System.currentTimeMillis()
         var retryCount = 0
    -    var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
    +    var kinesisWaitTimeMs =
    +      kinesisConfigs.getOrElse("spark.streaming.kinesis.retry.wait.time", "100").toInt
    +    val kinesisMaxRetries =
    --- End diff --
    
    define this once during class initialization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: Ysharma/spark kinesis retries

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Have got this JIRA ticket for the patch - https://issues.apache.org/jira/browse/SPARK-20140


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112347174
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala ---
    @@ -249,6 +252,17 @@ object KinesisInputDStream {
         }
     
         /**
    +      * Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS CloudWatch
    +      * endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
    +      *
    +      * @param conf: Map[String, String] to use for CloudWatch authentication
    +      */
    +    def kinesisConf(conf: Map[String, String]): Builder = {
    --- End diff --
    
    Do you think it would be better to pass values to builder rather than a map of configs. I thought map of configs can be easily extended when we need to support new configurations without code changes?
    What is your thought on values+builder per config vs one map for all configs ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: Ysharma/spark kinesis retries

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    You could just edit the title. I think closing this and opening new one is also fine and an option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112565900
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala ---
    @@ -249,6 +252,17 @@ object KinesisInputDStream {
         }
     
         /**
    +      * Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS CloudWatch
    +      * endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
    +      *
    +      * @param conf: Map[String, String] to use for CloudWatch authentication
    +      */
    +    def kinesisConf(conf: Map[String, String]): Builder = {
    --- End diff --
    
    If you want the extensibility of a key/value map for configs then I would go the route of getting a solution that uses ```SparkConf``` to do that in order to use the existing facilities provided by Spark. It doesn't make sense to me to introduce a key/value map just for Kinesis, especially since the naming of your keys (e.g. ```spark.streaming.kinesis.retry.waitTime```) would indicate to most users that these are ```SparkConf``` params, not a Kinesis-specific mapping that must be manually set up and passed to the Kinesis stream builder.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    I am not used to Kinesis. I usually click blame button and check both the recent code modifier and committer, e.g., https://github.com/yssharma/spark/blame/4b589adeaef540f6227266ecc628ad41ef0733c3/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
    
    I think I assume @tdas and @brkyvz are experts in this area.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Fair enough. I took another look and I think I may have been thinking of the way things worked in an earlier revision of this code. I think the case class is reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Just for info, while trying to use the `sc` in the `KinesisBackedBlockRDD ` : 
    
    `- Basic reading from Kinesis *** FAILED ***
      org.apache.spark.SparkException: Task not serializable
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2284)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2058)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
      at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      ...
      Cause: java.io.NotSerializableException: org.apache.spark.SparkContext
    Serialization stack:
    	- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@60c1663c)
    	- field (class: org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD, name: org$apache$spark$streaming$kinesis$KinesisBackedBlockRDD$$sc, type: class org.apache.spark.SparkContext)
    	- object (class org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD, KinesisBackedBlockRDD[0] at BlockRDD at KinesisBackedBlockRDD.scala:90)
    	- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
    	- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@52a33c3f)
    	- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
    	- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@71ed560f)
    	- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
    	- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@52a33c3f))
    	- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
    	- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at map at KinesisBackedBlockRDDSuite.scala:83)
    	- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer, type: class org.apache.spark.rdd.RDD)
    	- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, <function0>)
    	- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1)
    	- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13, <function1>)
      at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2284)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2058)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
      at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112817123
  
    --- Diff: docs/streaming-kinesis-integration.md ---
    @@ -216,3 +216,7 @@ de-aggregate records during consumption.
     - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`).  This is configurable.
       - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
       - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
    +
    +- Kinesis retry configurations
    --- End diff --
    
    @brkyvz or another Spark committer might have better suggestions here, but I would suggest making this section a new heading (rather than part of **Kinesis Checkpointing**) and adding a brief explanatory sentence, e.g.:
    
    ```
    #### Kinesis retry configuration
    - A Kinesis DStream will retry any failed request to the Kinesis API. The following SparkConf properties can be set in order to customize the behavior of the retry logic:
    ```
    
    followed by the rest of your changes here.
    
    This also reminds me that I owe @brkyvz a change to add docs for the stream builder interface here :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114584422
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -284,17 +282,12 @@ class KinesisSequenceRangeIterator(
         result.getOrElse {
           if (isTimedOut) {
             throw new SparkException(
    -          s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError)
    +          s"Timed out after ${kinesisReadConfigs.retryTimeoutMs} ms while "
    +          + "$message, last exception: ", lastError)
    --- End diff --
    
    `s"$message`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @budde : Implemented review changes and checked scala code style checks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112765111
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -17,21 +17,24 @@
     
     package org.apache.spark.streaming.kinesis
     
    -import scala.collection.JavaConverters._
    -import scala.reflect.ClassTag
    -import scala.util.control.NonFatal
    -
    -import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
    +import com.amazonaws.auth.AWSCredentials
     import com.amazonaws.services.kinesis.AmazonKinesisClient
     import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
     import com.amazonaws.services.kinesis.model._
    -
     import org.apache.spark._
     import org.apache.spark.internal.Logging
    +import org.apache.spark.network.util.JavaUtils
     import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
     import org.apache.spark.storage.BlockId
     import org.apache.spark.util.NextIterator
     
    +import scala.collection.JavaConverters._
    --- End diff --
    
    Why change the ordering of this import group? I don't think this is consistent with the scalastyle for this project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112816898
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -295,6 +306,23 @@ class KinesisSequenceRangeIterator(
     
     private[streaming]
     object KinesisSequenceRangeIterator {
    -  val MAX_RETRIES = 3
    -  val MIN_RETRY_WAIT_TIME_MS = 100
    +  /**
    +   * The maximum number of attempts to be made to kinesis. Defaults to 3.
    +   */
    +  val MAX_RETRIES = "3"
    +
    +  /**
    +   * The interval between consequent kinesis retries. Defaults to 100ms.
    +   */
    +  val MIN_RETRY_WAIT_TIME_MS = "100ms"
    +
    +  /**
    +   * Key for configuring the retry wait time for kinesis. The values can be passed to SparkConf.
    --- End diff --
    
    *nit:* I'd make the following tweaks here:
    
    ```scala
    /**
     * SparkConf key for configuring the wait time to use before retrying a Kinesis attempt.
     */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @felixcheung : would love to get more review comments to improve the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112764344
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -83,7 +86,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
         @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
         val retryTimeoutMs: Int = 10000,
         val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _,
    -    val kinesisCreds: SparkAWSCredentials = DefaultCredentials
    +    val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
    +    val sparkConf: SparkConf = new SparkConf()
    --- End diff --
    
    Why does this need to be provided as a constructor parameter? You'll want to use the global ```SparkConf``` for the context via ```sc.getConf```. To avoid bringing ```sc``` into the serialized closure for the ```compute()``` method and raising an exception you can alias it as a private field in this class:
    
    ```scala
    private val sparkConf: SparkConf = sc.getConf
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114850001
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    + *
    + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
    + * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
    + *                         Defaults to 100ms.
    + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
    +*                         Defaults to batch duration provided for streaming,
    --- End diff --
    
    *nit*: You're missing a space here and on the following line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @tdas @brkyvz 
    - Added new changes for removing other constants hardcoded at multiple places.
    - Squashed 3 commits into single commit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @brkyvz are you ok with this PR at a high level? If yes, I could help with review and shepherd this 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @brkyvz - Added new changes that adds -
    - A case class `KinesisReadConfigurations` that adds all the kinesis read configs in a single place
    - A test class that passes the kinesis configs in `SparkConf` which are then used to create the kinesis configs object in `KinesisInputDStream` and passed down to `KinesisBackedBlockRDD`
    - Docs improvement
    
    I also played with the `PrivateMethodTester ` but wasn't able to access the private function `KinesisSequenceRangeIterator#retryOrTimeout` . Probably because of the generics used in the function. I used an alternative to fetch the RDD's directly and check the configs passed in there.
    I would still like to learn how to get the `retryOrTimeout` working just out of interest. Adding the error below:
    
    ```
        // KinesisSequenceRangeIterator # retryOrTimeout
        val retryOrTimeoutMethod = PrivateMethod[Object]('retryOrTimeout) // <<<- Issue
    
        val partitions = kinesisRDD.partitions.map {
          _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq
    
        seqNumRanges1.ranges.map{ range =>
          val seqRangeIter =
            new KinesisSequenceRangeIterator(DefaultCredentials.provider.getCredentials,
            dummyEndpointUrl, dummyRegionName, range, kinesisRDD.kinesisReadConfigs)
    
          seqRangeIter.invokePrivate(retryOrTimeoutMethod("Passing custom message"))
    
        }
    
    
    
      - Kinesis read with custom configurations *** FAILED ***
      java.lang.IllegalArgumentException: Can't find a private method named: retryOrTimeout
      at org.scalatest.PrivateMethodTester$Invoker.invokePrivate(PrivateMethodTester.scala:247)
      at org.apache.spark.streaming.kinesis.KinesisStreamTests$$anonfun$7$$anonfun$apply$mcV$sp$13.apply(KinesisStreamSuite.scala:286)
      at org.apache.spark.streaming.kinesis.KinesisStreamTests$$anonfun$7$$anonfun$apply$mcV$sp$13.apply(KinesisStreamSuite.scala:281)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.immutable.List.map(List.scala:285)
      at org.apache.spark.streaming.kinesis.KinesisStreamTests$$anonfun$7.apply$mcV$sp(KinesisStreamSuite.scala:281)
      at org.apache.spark.streaming.kinesis.KinesisStreamTests$$anonfun$7.apply(KinesisStreamSuite.scala:237)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @budde - thanks for taking time to review it. Appreciate it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112848595
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
         endpointUrl: String,
         regionId: String,
         range: SequenceNumberRange,
    -    retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
    +    retryTimeoutMs: Int,
    +    sparkConf: SparkConf) extends NextIterator[Record] with Logging {
    --- End diff --
    
    I would prefer a specialized case class,
    something like:
    ```scala
    case class KinesisReadConfigurations(
      maxRetries: Int,
      retryWaitTimeMs: Long,
      retryTimeoutMs: Long)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112766374
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -147,6 +153,17 @@ class KinesisSequenceRangeIterator(
       private var lastSeqNumber: String = null
       private var internalIterator: Iterator[Record] = null
     
    +  // variable for kinesis wait time interval between next retry
    +  private val kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
    +    Try {sparkConf.get("spark.streaming.kinesis.retry.waitTime")}
    --- End diff --
    
    It may also be useful to declare these keys as public constants in a sensible location such as the [companion object to ```KinesisInputDStream```](https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala#L84), e.g.:
    
    ```scala
    object KinesisInputDStream {
    ...
      /**
       * Relevant doc
       */
      val RETRY_WAIT_TIME_KEY = "spark.streaming.kinesis.retry.waitTime"
     
      /**
       * Relevant doc
       */
      val RETRY_MAX_ATTEMPTS_KEY = "spark.streaming.kinesis.retry.maxAttempts"
    ...
    ```
    
    This will make things a little less brittle for users who want to dynamically fill in SparkConf values in their apps. You would also be able use these constants in unit tests here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114855947
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    + *
    + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
    + * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
    + *                         Defaults to 100ms.
    + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
    +*                         Defaults to batch duration provided for streaming,
    +*                         else uses 10000 if invoked directly.
    + */
    +private[kinesis] case class KinesisReadConfigurations(
    +                                                     maxRetries: Int,
    +                                                     retryWaitTimeMs: Long,
    +                                                     retryTimeoutMs: Long)
    +
    +object KinesisReadConfigurations {
    +  def apply(): KinesisReadConfigurations = {
    +    KinesisReadConfigurations(3, 100, 10000)
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Waiting for review @brkyvz . Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @budde - Not sure if we can exactly test the configured timeouts. I have debugged the flow in my testcase to verify that the custom values are passed down to the Kinesis backed block rdd.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114850114
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    + *
    + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
    + * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
    + *                         Defaults to 100ms.
    + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
    +*                         Defaults to batch duration provided for streaming,
    +*                         else uses 10000 if invoked directly.
    + */
    +private[kinesis] case class KinesisReadConfigurations(
    +                                                     maxRetries: Int,
    --- End diff --
    
    Incorrect indentation here-- should be 2 softabs/4 spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112848855
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
         endpointUrl: String,
         regionId: String,
         range: SequenceNumberRange,
    -    retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
    +    retryTimeoutMs: Int,
    +    sparkConf: SparkConf) extends NextIterator[Record] with Logging {
    --- End diff --
    
    Or we can pass then via spark conf and construct the KinesisReadConfigurations object in `KinesisInputDStream` and pass it down to `KinesisBackedBlockRDD `.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112764808
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -147,6 +153,17 @@ class KinesisSequenceRangeIterator(
       private var lastSeqNumber: String = null
       private var internalIterator: Iterator[Record] = null
     
    +  // variable for kinesis wait time interval between next retry
    +  private val kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
    +    Try {sparkConf.get("spark.streaming.kinesis.retry.waitTime")}
    +      .getOrElse(MIN_RETRY_WAIT_TIME_MS)
    +  )
    +
    +  // variable for kinesis max retry attempts
    +  private val kinesisMaxRetries =
    +    Try {sparkConf.get("spark.streaming.kinesis.retry.maxAttempts")}
    --- End diff --
    
    See above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112816810
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -295,6 +306,23 @@ class KinesisSequenceRangeIterator(
     
     private[streaming]
     object KinesisSequenceRangeIterator {
    -  val MAX_RETRIES = 3
    -  val MIN_RETRY_WAIT_TIME_MS = 100
    +  /**
    +   * The maximum number of attempts to be made to kinesis. Defaults to 3.
    --- End diff --
    
    *nit:* **K**inesis


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112566462
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -147,6 +152,14 @@ class KinesisSequenceRangeIterator(
       private var lastSeqNumber: String = null
       private var internalIterator: Iterator[Record] = null
     
    +  // variable for kinesis wait time interval between next retry
    +  private var kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
    --- End diff --
    
    I don't think you want to do this-- ```kinesisWaitTimeMs``` is never reset to the default value after the retry loop exists. I think you should make this a ```val``` and introduce a ```var``` initialized to its value within ```retryOrTimeout()``` to store the wait time for each retry iteration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112849184
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
         endpointUrl: String,
         regionId: String,
         range: SequenceNumberRange,
    -    retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
    +    retryTimeoutMs: Int,
    +    sparkConf: SparkConf) extends NextIterator[Record] with Logging {
    --- End diff --
    
    I prefer the latter. Create it in `KinesisInputDStream` and pass it down to `KinesisBackedBlockRDD`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @brkyvz - thanks for taking time to review the patch. appreciate it.
    Implemented all your suggestions. Now passing a new map for the kinesis configs and added mechanism to use the builder for the configs.
    
    As for the spark context, I wanted to use the sparkcontext available in `KinesisBackedBlockRDD` directly as well (instead of creating a new config map), but the sc in `KinesisBackedBlockRDD`
    is not available, and trying to use it there causes serialization errors. Passing a different config map looked like the only simple solution to access the kineses configs.
    
    The patch now doesnot use the `sc` at all and expects a kinesisConf to be passed to the `KinesisInputDStream` builder directly.
    
    Let me know your thoughts. Thanks again for the review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112343115
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -83,7 +83,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
         @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
         val retryTimeoutMs: Int = 10000,
         val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _,
    -    val kinesisCreds: SparkAWSCredentials = DefaultCredentials
    +    val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
    +    val kinesisConf: Map[String, String] = Map.empty
    --- End diff --
    
    +1. I think reading the config values from ```sc``` will be a much cleaner approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112344746
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala ---
    @@ -249,6 +252,17 @@ object KinesisInputDStream {
         }
     
         /**
    +      * Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS CloudWatch
    --- End diff --
    
    Documentation is not correct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112841862
  
    --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala ---
    @@ -101,6 +103,36 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
         }
       }
     
    +  testIfEnabled("Basic reading from Kinesis with modified configurations") {
    --- End diff --
    
    I don't see how this test actually tests the configuration setting. It just tests if things work, not that the configurations are actually picked up.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112848401
  
    --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala ---
    @@ -101,6 +103,36 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
         }
       }
     
    +  testIfEnabled("Basic reading from Kinesis with modified configurations") {
    --- End diff --
    
    I wasn't able to test the actual waiting of Kinesis. I haven't looked at the `PrivateMethodTester ` yet and check how that can help us to test how the vars are picked.
    I used this testcase to debug and verify that all the values are passed correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114584091
  
    --- Diff: docs/streaming-kinesis-integration.md ---
    @@ -216,3 +216,7 @@ de-aggregate records during consumption.
     - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`).  This is configurable.
       - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
       - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
    +
    +#### Kinesis retry configuration
    + - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit 'ThroughputExceededExceptions', when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms".
    + - `spark.streaming.kinesis.retry.maxAttempts` : Max number of retries for Kinesis fetches. This config can also be used to tackle the Kinesis `ThroughputExceededExceptions` in scenarios mentioned above. It can be increased to have more number of retries for Kinesis reads. Default is 3.
    --- End diff --
    
    ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112764788
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -147,6 +153,17 @@ class KinesisSequenceRangeIterator(
       private var lastSeqNumber: String = null
       private var internalIterator: Iterator[Record] = null
     
    +  // variable for kinesis wait time interval between next retry
    +  private val kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
    +    Try {sparkConf.get("spark.streaming.kinesis.retry.waitTime")}
    --- End diff --
    
    This complexity isn't necessary. You can achieve the same effect by using an alternate form of [```SparkConf.get()```](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf@get(key:String,defaultValue:String):String):
    
    ```scala
    private val kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
      sparkConf.get("spark.streaming.kinesis.retry.waitTime", MIN_RETRY_WAIT_TIME_MS))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112841869
  
    --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala ---
    @@ -101,6 +103,36 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
         }
       }
     
    +  testIfEnabled("Basic reading from Kinesis with modified configurations") {
    +    // Add Kinesis retry configurations
    +    sc.conf.set(RETRY_WAIT_TIME_KEY, "1000ms")
    --- End diff --
    
    we need to clean these up after the test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    **[Test build #3634 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3634/testReport)** for PR 17467 at commit [`ccb6c19`](https://github.com/apache/spark/commit/ccb6c19dfaec4b7667552dfeed3da730923b7b64).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/17467


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @brkyvz - Thanks for the review comments. Updated the patch, please review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112344999
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala ---
    @@ -249,6 +252,17 @@ object KinesisInputDStream {
         }
     
         /**
    +      * Sets the [[SparkAWSCredentials]] to use for authenticating to the AWS CloudWatch
    +      * endpoint. Will use the same credentials used for AWS Kinesis if no custom value is set.
    +      *
    +      * @param conf: Map[String, String] to use for CloudWatch authentication
    +      */
    +    def kinesisConf(conf: Map[String, String]): Builder = {
    --- End diff --
    
    If using ```SparkConf``` to store these custom config values doesn't end up being feasible then I'd strongly prefer that we follow the existing approach and have separate builder methods for setting the retry wait time and max attempts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @yssharma left some comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r115110773
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    + *
    + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
    + * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
    + *                         Defaults to 100ms.
    + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
    +*                         Defaults to batch duration provided for streaming,
    +*                         else uses 10000 if invoked directly.
    + */
    +private[kinesis] case class KinesisReadConfigurations(
    +                                                     maxRetries: Int,
    +                                                     retryWaitTimeMs: Long,
    +                                                     retryTimeoutMs: Long)
    +
    +object KinesisReadConfigurations {
    +  def apply(): KinesisReadConfigurations = {
    --- End diff --
    
    It can be used in places where we don't have the spark conf. I am using this in `KinesisBackedBlockRDD`'s constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Awesome. Thanks @budde @brkyvz for reviews and patch improvements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112764350
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -112,7 +116,8 @@ class KinesisBackedBlockRDD[T: ClassTag](
           val credentials = kinesisCreds.provider.getCredentials
           partition.seqNumberRanges.ranges.iterator.flatMap { range =>
             new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
    -          range, retryTimeoutMs).map(messageHandler)
    +          range, retryTimeoutMs, sparkConf
    +        ).map(messageHandler)
    --- End diff --
    
    *nit:* Move this to end of previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112766633
  
    --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala ---
    @@ -101,6 +101,37 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
         }
       }
     
    +  testIfEnabled("Basic reading from Kinesis with modified configurations") {
    +    // Add Kinesis retry configurations
    +    sc.conf.set("spark.streaming.kinesis.retry.waitTime", "1000ms")
    +    sc.conf.set("spark.streaming.kinesis.retry.maxAttempts", "5")
    +
    +    // Verify all data using multiple ranges in a single RDD partition
    +    val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
    +      testUtils.endpointUrl, fakeBlockIds(1),
    +      Array(SequenceNumberRanges(allRanges.toArray)),
    +      sparkConf = sc.getConf).map { bytes => new String(bytes).toInt }.collect()
    +    assert(receivedData1.toSet === testData.toSet)
    +
    +    // Verify all data using one range in each of the multiple RDD partitions
    +    val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
    +      testUtils.endpointUrl, fakeBlockIds(allRanges.size),
    +      allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray,
    +      sparkConf = sc.getConf).map { bytes => new String(bytes).toInt }.collect()
    +    assert(receivedData2.toSet === testData.toSet)
    +
    +    // Verify ordering within each partition
    +    val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
    +      testUtils.endpointUrl, fakeBlockIds(allRanges.size),
    +      allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray,
    +      sparkConf = sc.getConf
    +    ).map { bytes => new String(bytes).toInt }.collectPartitions()
    --- End diff --
    
    *nit:* move this to the end of previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112343978
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -147,6 +152,14 @@ class KinesisSequenceRangeIterator(
       private var lastSeqNumber: String = null
       private var internalIterator: Iterator[Record] = null
     
    +  // variable for kinesis wait time interval between next retry
    +  private var kinesisWaitTimeMs = JavaUtils.timeStringAsMs(
    --- End diff --
    
    Can't this be a ```val```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112841668
  
    --- Diff: docs/streaming-kinesis-integration.md ---
    @@ -216,3 +216,7 @@ de-aggregate records during consumption.
     - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`).  This is configurable.
       - `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
       - `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
    +
    +#### Kinesis retry configurations
    + - `spark.streaming.kinesis.retry.waitTime` : SparkConf for wait time between Kinesis retries (in milliseconds). Default is "100ms".
    --- End diff --
    
    `SparkConf for` is redundant. I would try to focus on when should people actually tweak these, why are these confs important in the first place 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by budde <gi...@git.apache.org>.
Github user budde commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @yssharma Fair enough. I'll try to get your update reviewed later today


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r114855912
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kinesis
    +
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.streaming.StreamingContext
    +
    +/**
    + * Configurations to pass to the [KinesisBackedBlockRDD].
    + *
    + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
    + * @param retryWaitTimeMs: The interval between consequent Kinesis retries.
    + *                         Defaults to 100ms.
    + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
    +*                         Defaults to batch duration provided for streaming,
    +*                         else uses 10000 if invoked directly.
    + */
    +private[kinesis] case class KinesisReadConfigurations(
    +                                                     maxRetries: Int,
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: [SPARK-20140][DStream] Remove hardcoded kinesis retry wa...

Posted by yssharma <gi...@git.apache.org>.
Github user yssharma commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    @HyukjinKwon What should be the next steps for this PR. Are there any Spark-Kinesis experts who can review the patch ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17467: Ysharma/spark kinesis retries

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17467
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17467: [SPARK-20140][DStream] Remove hardcoded kinesis r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17467#discussion_r112841794
  
    --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---
    @@ -135,7 +139,8 @@ class KinesisSequenceRangeIterator(
         endpointUrl: String,
         regionId: String,
         range: SequenceNumberRange,
    -    retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
    +    retryTimeoutMs: Int,
    +    sparkConf: SparkConf) extends NextIterator[Record] with Logging {
    --- End diff --
    
    I wouldn't pass in the `SparkConf` all the way in here. See how `retryTimeoutMs` has been passed in specifically above. You can do two things:
     1. Pass each of them one by one
     2. Evaluate all the configurations in `KinesisBackedBlockRDD` or one level higher and use a `case class` such as `KinesisReadConfigurations`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org