You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by koeninger <gi...@git.apache.org> on 2018/02/11 05:27:30 UTC

[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

GitHub user koeninger opened a pull request:

    https://github.com/apache/spark/pull/20572

    [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets

    ## What changes were proposed in this pull request?
    
    Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to allow streaming jobs to proceed on compacted topics (or other situations involving gaps between offsets in the log).
    
    ## How was this patch tested?
    
    Added new unit test
    
    @justinrmiller has been testing this branch in production for a few weeks

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/koeninger/spark-1 SPARK-17147

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20572
    
----
commit 3082de7e43e8c381dc2227005d1e0fc5bd2c3d29
Author: cody koeninger <co...@...>
Date:   2016-10-08T21:21:48Z

    [SPARK-17147][STREAMING][KAFKA] failing test for compacted topics

commit e8ea89ea10527c6723df4af2685004ea67d872cd
Author: cody koeninger <co...@...>
Date:   2016-10-09T04:59:39Z

    [SPARK-17147][STREAMING][KAFKA] test passing for compacted topics

commit 182943e36f596d0cb5841a9c63471bea1dd9047b
Author: cody koeninger <co...@...>
Date:   2018-02-11T04:09:38Z

    spark.streaming.kafka.allowNonConsecutiveOffsets

commit 89f4bc5f4de78cdcc22b5c9b26a27ee9263048c8
Author: cody koeninger <co...@...>
Date:   2018-02-11T04:13:49Z

    [SPARK-17147][STREAMING][KAFKA] remove stray param doc

commit 12e65bedddbcd2407598e69fa3c6fcbcdfc67e5d
Author: cody koeninger <co...@...>
Date:   2018-02-11T04:28:22Z

    [SPARK-17147][STREAMING][KAFKA] prepare for merge of master

commit 2ed51f1f73ee75ffd08355265a72e68e83ef592d
Author: cody koeninger <co...@...>
Date:   2018-02-11T05:19:31Z

    Merge branch 'master' into SPARK-17147

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169490350
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala ---
    @@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V](
             s"skipping ${part.topic} ${part.partition}")
           Iterator.empty
         } else {
    -      new KafkaRDDIterator(part, context)
    +      logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
    +        s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    +      if (compacted) {
    +        new CompactedKafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      } else {
    +        new KafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      }
         }
       }
    +}
     
    -  /**
    -   * An iterator that fetches messages directly from Kafka for the offsets in partition.
    -   * Uses a cached consumer where possible to take advantage of prefetching
    -   */
    -  private class KafkaRDDIterator(
    -      part: KafkaRDDPartition,
    -      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
    -
    -    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
    -      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    -
    -    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -
    -    context.addTaskCompletionListener{ context => closeIfNeeded() }
    -
    -    val consumer = if (useConsumerCache) {
    -      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
    -      if (context.attemptNumber >= 1) {
    -        // just in case the prior attempt failures were cache related
    -        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
    -      }
    -      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    -    } else {
    -      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    +/**
    + * An iterator that fetches messages directly from Kafka for the offsets in partition.
    + * Uses a cached consumer where possible to take advantage of prefetching
    + */
    +private class KafkaRDDIterator[K, V](
    +  part: KafkaRDDPartition,
    +  context: TaskContext,
    +  kafkaParams: ju.Map[String, Object],
    +  useConsumerCache: Boolean,
    +  pollTimeout: Long,
    +  cacheInitialCapacity: Int,
    +  cacheMaxCapacity: Int,
    +  cacheLoadFactor: Float
    +) extends Iterator[ConsumerRecord[K, V]] {
    +
    +  val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +
    +  context.addTaskCompletionListener{ context => closeIfNeeded() }
    +
    +  val consumer = if (useConsumerCache) {
    +    CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
    +    if (context.attemptNumber >= 1) {
    +      // just in case the prior attempt failures were cache related
    +      CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
         }
    +    CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    +  } else {
    +    CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    +  }
     
    -    var requestOffset = part.fromOffset
    +  var requestOffset = part.fromOffset
     
    -    def closeIfNeeded(): Unit = {
    -      if (!useConsumerCache && consumer != null) {
    -        consumer.close
    -      }
    +  def closeIfNeeded(): Unit = {
    +    if (!useConsumerCache && consumer != null) {
    +      consumer.close
    --- End diff --
    
    Just nits here, but I'd write `close()` as it clearly has side effects


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87609 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87609/testReport)** for PR 20572 at commit [`248b511`](https://github.com/apache/spark/commit/248b5111651da3d570768a0f4ffffb603193285c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    @srowen Sorry to turn on the bat-signal, but would you be able to help find a committer willing to look at this?  
    
    After finally finding someone willing to test in production, don't want this to fall through the cracks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/782/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87296/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by justinrmiller <gi...@git.apache.org>.
Github user justinrmiller commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Otherwise the job crashes, even for gaps of one or two messages on a couple of partitions (and while fail hard is a good philosophy, in this case it's not really practical for us) so it's pretty significant to us. That said, I've got the code copied into my project so when we go to 2.3.0 we could probably just keep using that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169537541
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
             s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
           record = buffer.next()
           assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
    +        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting $offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    +    assert(buffer.hasNext(),
    --- End diff --
    
    That's a "shouldn't happen unless the topicpartition or broker is gone" kind of thing.  Semantically I could see that being more like require than assert, but don't have a strong opinion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/978/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169490949
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala ---
    @@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V](
             s"skipping ${part.topic} ${part.partition}")
           Iterator.empty
         } else {
    -      new KafkaRDDIterator(part, context)
    +      logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
    +        s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    +      if (compacted) {
    +        new CompactedKafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      } else {
    +        new KafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      }
         }
       }
    +}
     
    -  /**
    -   * An iterator that fetches messages directly from Kafka for the offsets in partition.
    -   * Uses a cached consumer where possible to take advantage of prefetching
    -   */
    -  private class KafkaRDDIterator(
    -      part: KafkaRDDPartition,
    -      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
    -
    -    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
    -      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    -
    -    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -
    -    context.addTaskCompletionListener{ context => closeIfNeeded() }
    -
    -    val consumer = if (useConsumerCache) {
    -      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
    -      if (context.attemptNumber >= 1) {
    -        // just in case the prior attempt failures were cache related
    -        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
    -      }
    -      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    -    } else {
    -      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    +/**
    + * An iterator that fetches messages directly from Kafka for the offsets in partition.
    + * Uses a cached consumer where possible to take advantage of prefetching
    + */
    +private class KafkaRDDIterator[K, V](
    +  part: KafkaRDDPartition,
    +  context: TaskContext,
    +  kafkaParams: ju.Map[String, Object],
    +  useConsumerCache: Boolean,
    +  pollTimeout: Long,
    +  cacheInitialCapacity: Int,
    +  cacheMaxCapacity: Int,
    +  cacheLoadFactor: Float
    +) extends Iterator[ConsumerRecord[K, V]] {
    +
    +  val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +
    +  context.addTaskCompletionListener{ context => closeIfNeeded() }
    +
    +  val consumer = if (useConsumerCache) {
    +    CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
    +    if (context.attemptNumber >= 1) {
    +      // just in case the prior attempt failures were cache related
    +      CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
         }
    +    CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    +  } else {
    +    CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    +  }
     
    -    var requestOffset = part.fromOffset
    +  var requestOffset = part.fromOffset
     
    -    def closeIfNeeded(): Unit = {
    -      if (!useConsumerCache && consumer != null) {
    -        consumer.close
    -      }
    +  def closeIfNeeded(): Unit = {
    +    if (!useConsumerCache && consumer != null) {
    +      consumer.close
         }
    +  }
    +
    +  override def hasNext(): Boolean = requestOffset < part.untilOffset
     
    -    override def hasNext(): Boolean = requestOffset < part.untilOffset
    +  override def next(): ConsumerRecord[K, V] = {
    +    assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
    +    val r = consumer.get(requestOffset, pollTimeout)
    +    requestOffset += 1
    +    r
    +  }
    +}
     
    -    override def next(): ConsumerRecord[K, V] = {
    -      assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
    -      val r = consumer.get(requestOffset, pollTimeout)
    -      requestOffset += 1
    -      r
    +/**
    + * An iterator that fetches messages directly from Kafka for the offsets in partition.
    + * Uses a cached consumer where possible to take advantage of prefetching.
    + * Intended for compacted topics, or other cases when non-consecutive offsets are ok.
    + */
    +private class CompactedKafkaRDDIterator[K, V](
    +    part: KafkaRDDPartition,
    +    context: TaskContext,
    +    kafkaParams: ju.Map[String, Object],
    +    useConsumerCache: Boolean,
    +    pollTimeout: Long,
    +    cacheInitialCapacity: Int,
    +    cacheMaxCapacity: Int,
    +    cacheLoadFactor: Float
    +  ) extends KafkaRDDIterator[K, V](
    +    part,
    +    context,
    +    kafkaParams,
    +    useConsumerCache,
    +    pollTimeout,
    +    cacheInitialCapacity,
    +    cacheMaxCapacity,
    +    cacheLoadFactor
    +  ) {
    +
    +  consumer.compactedStart(part.fromOffset, pollTimeout)
    +
    +  var nextRecord = consumer.compactedNext(pollTimeout)
    --- End diff --
    
    `private`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169489226
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
             s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
           record = buffer.next()
           assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
    +        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting $offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    +    assert(buffer.hasNext(),
    --- End diff --
    
    Should this really be an assert, like, should never happen regardless of input or external state? or just a condition that should generate an exception if false?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87575/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87576 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87576/testReport)** for PR 20572 at commit [`08f3570`](https://github.com/apache/spark/commit/08f3570c0491f96abcaa9a6dc0f4e3030cfea6c0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169491061
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala ---
    @@ -22,12 +22,17 @@ import java.{ util => ju }
     import scala.collection.JavaConverters._
     import scala.util.Random
     
    +import kafka.common.TopicAndPartition
    --- End diff --
    
    These are the older Kafka APIs right? this may all be correct, just making sure these are the classes that are needed in a Kafka 0.10 test?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87576/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170279950
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala ---
    @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
     
       private val preferredHosts = LocationStrategies.PreferConsistent
     
    +  private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) {
    +    val mockTime = new MockTime()
    +    // LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api
    +    val logs = new Pool[TopicAndPartition, Log]()
    +    val logDir = kafkaTestUtils.brokerLogDir
    +    val dir = new java.io.File(logDir, topic + "-" + partition)
    +    dir.mkdirs()
    +    val logProps = new ju.Properties()
    +    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
    +    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float)
    --- End diff --
    
    Do you have to 'cast' this to a Java Float object to get it to compile?
    `java.lang.Float.valueOf(0.1f)` works too I guess, but equally weird. OK if it's required.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by justinrmiller <gi...@git.apache.org>.
Github user justinrmiller commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Not to pile on, but I was encountering non-consecutive offsets on non-compacted topics once or twice a day at our volume. This patch fixes this (haven't had a problem with it since) and I'm sure there will be others encountering this if they stream continuously tens of billions of messages a day on kafka.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87690 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87690/testReport)** for PR 20572 at commit [`e3ae845`](https://github.com/apache/spark/commit/e3ae84523621405d2f2b55ec92cb79921aaba961).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87575 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87575/testReport)** for PR 20572 at commit [`2246750`](https://github.com/apache/spark/commit/224675046be2fbc38fea59c59394736e19042eb4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169489088
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
             s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
           record = buffer.next()
           assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
    +        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting $offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    +    assert(buffer.hasNext(),
    +      s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout")
    +    val record = buffer.next()
    +    nextOffset = record.offset + 1
    +    record
    +  }
    +
    +  /**
    +   * Rewind to previous record in the batch from a compacted topic.
    +   * Will throw NoSuchElementException if no previous element
    --- End diff --
    
    Could be a `@throws` tag but no big deal.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169490790
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala ---
    @@ -87,47 +89,63 @@ private[spark] class KafkaRDD[K, V](
         }.toArray
       }
     
    -  override def count(): Long = offsetRanges.map(_.count).sum
    +  override def count(): Long =
    +    if (compacted) {
    +      super.count()
    +    } else {
    +      offsetRanges.map(_.count).sum
    +    }
     
       override def countApprox(
           timeout: Long,
           confidence: Double = 0.95
    -  ): PartialResult[BoundedDouble] = {
    -    val c = count
    -    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
    -  }
    +  ): PartialResult[BoundedDouble] =
    +    if (compacted) {
    +      super.countApprox(timeout, confidence)
    +    } else {
    +      val c = count
    +      new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
    +    }
     
    -  override def isEmpty(): Boolean = count == 0L
    +  override def isEmpty(): Boolean =
    +    if (compacted) {
    +      super.isEmpty()
    +    } else {
    +      count == 0L
    +    }
     
    -  override def take(num: Int): Array[ConsumerRecord[K, V]] = {
    -    val nonEmptyPartitions = this.partitions
    -      .map(_.asInstanceOf[KafkaRDDPartition])
    -      .filter(_.count > 0)
    +  override def take(num: Int): Array[ConsumerRecord[K, V]] =
    +    if (compacted) {
    +      super.take(num)
    +    } else {
    +      val nonEmptyPartitions = this.partitions
    +        .map(_.asInstanceOf[KafkaRDDPartition])
    +        .filter(_.count > 0)
     
    -    if (num < 1 || nonEmptyPartitions.isEmpty) {
    -      return new Array[ConsumerRecord[K, V]](0)
    -    }
    +      if (num < 1 || nonEmptyPartitions.isEmpty) {
    +        return new Array[ConsumerRecord[K, V]](0)
    +      }
     
    -    // Determine in advance how many messages need to be taken from each partition
    -    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
    -      val remain = num - result.values.sum
    -      if (remain > 0) {
    -        val taken = Math.min(remain, part.count)
    -        result + (part.index -> taken.toInt)
    -      } else {
    -        result
    +      // Determine in advance how many messages need to be taken from each partition
    +      val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
    +        val remain = num - result.values.sum
    +        if (remain > 0) {
    +          val taken = Math.min(remain, part.count)
    +          result + (part.index -> taken.toInt)
    +        } else {
    +          result
    +        }
           }
    -    }
     
    -    val buf = new ArrayBuffer[ConsumerRecord[K, V]]
    -    val res = context.runJob(
    -      this,
    -      (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
    -      it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
    -    )
    -    res.foreach(buf ++= _)
    -    buf.toArray
    -  }
    +      val buf = new ArrayBuffer[ConsumerRecord[K, V]]
    +      val res = context.runJob(
    +        this,
    +        (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
    +        it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
    +      )
    +      res.foreach(buf ++= _)
    +      buf.toArray
    --- End diff --
    
    I am not sure why this code doesn't just `.flatten` the result of `.runJob` to get an array of all of the results. Feel free to change it, or not. Maybe I'm missing something


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87575 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87575/testReport)** for PR 20572 at commit [`2246750`](https://github.com/apache/spark/commit/224675046be2fbc38fea59c59394736e19042eb4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87296 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87296/testReport)** for PR 20572 at commit [`2ed51f1`](https://github.com/apache/spark/commit/2ed51f1f73ee75ffd08355265a72e68e83ef592d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Given RCs have been going on for 2 months, I'm reluctant to put anything at all into 2.3.0 now. It's borderline for putting into 2.3.1 but if it's fairly well tested, and sort of a 'fix', and does solve a significant problem, I'd support it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1002/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169491706
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka010.mocks
    +
    +import java.util.concurrent._
    +
    +import kafka.utils.Time
    +
    +/**
    + * A class used for unit testing things which depend on the Time interface.
    + *
    + * This class never manually advances the clock, it only does so when you call
    + *   sleep(ms)
    + *
    + * It also comes with an associated scheduler instance for managing background tasks in
    + * a deterministic way.
    + */
    +private[kafka010] class MockTime(@volatile private var currentMs: Long) extends Time {
    +
    +  val scheduler = new MockScheduler(this)
    +
    +  def this() = this(System.currentTimeMillis)
    +
    +  def milliseconds: Long = currentMs
    +
    +  def nanoseconds: Long =
    +    TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
    +
    +  def sleep(ms: Long) {
    +    this.currentMs += ms
    +    scheduler.tick()
    +  }
    +
    +  override def toString(): String = "MockTime(%d)".format(milliseconds)
    --- End diff --
    
    Use string interpolation to be consistent?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169663225
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
             s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
           record = buffer.next()
           assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
    +        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting $offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    +    assert(buffer.hasNext(),
    --- End diff --
    
     `assert` turns into a JVM assert (I think?) and as such would be turned off if assertions are disabled, which is how it ought to run in production. If it's something that _could_ happen at all I think it should be `require`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169538019
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private(
     
       // TODO if the buffer was kept around as a random-access structure,
       // could possibly optimize re-calculating of an RDD in the same batch
    -  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
    +  protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
    --- End diff --
    
    Agreed, think it should be ok


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170278685
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala ---
    @@ -0,0 +1,100 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka010.mocks
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.PriorityQueue
    +
    +import kafka.utils.{Scheduler, Time}
    +
    +/**
    + * A mock scheduler that executes tasks synchronously using a mock time instance.
    + * Tasks are executed synchronously when the time is advanced.
    + * This class is meant to be used in conjunction with MockTime.
    + *
    + * Example usage
    + * <code>
    + *   val time = new MockTime
    + *   time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000)
    + *   time.sleep(1001) // this should cause our scheduled task to fire
    + * </code>
    + *
    + * Incrementing the time to the exact next execution time of a task will result in that task
    + * executing (it as if execution itself takes no time).
    + */
    +private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
    +
    +  /* a priority queue of tasks ordered by next execution time */
    +  var tasks = new PriorityQueue[MockTask]()
    +
    +  def isStarted: Boolean = true
    +
    +  def startup(): Unit = {}
    +
    +  def shutdown(): Unit = synchronized {
    +    tasks.foreach(_.fun())
    +    tasks.clear()
    +  }
    +
    +  /**
    +   * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs
    +   * when this method is called and the execution happens synchronously in the calling thread.
    +   * If you are using the scheduler associated with a MockTime instance this call
    +   * will be triggered automatically.
    +   */
    +  def tick() {
    +    this synchronized {
    +      val now = time.milliseconds
    +      while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
    +        /* pop and execute the task with the lowest next execution time */
    +        val curr = tasks.dequeue
    +        curr.fun()
    +        /* if the task is periodic, reschedule it and re-enqueue */
    +        if(curr.periodic) {
    +          curr.nextExecution += curr.period
    +          this.tasks += curr
    +        }
    +      }
    +    }
    +  }
    +
    +  def schedule(
    +      name: String,
    +      fun: () => Unit,
    +      delay: Long = 0,
    +      period: Long = -1,
    +      unit: TimeUnit = TimeUnit.MILLISECONDS) {
    +    this synchronized {
    --- End diff --
    
    I think I'd still write such methods as:
    
    ```
    def foo(): T = synchronized {
       ...
    }
    ```
    
    This is how the rest of the code base does it (and other Scala code I've seen).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged to master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170799163
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala ---
    @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
     
       private val preferredHosts = LocationStrategies.PreferConsistent
     
    +  private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) {
    +    val mockTime = new MockTime()
    +    // LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api
    +    val logs = new Pool[TopicAndPartition, Log]()
    +    val logDir = kafkaTestUtils.brokerLogDir
    +    val dir = new java.io.File(logDir, topic + "-" + partition)
    +    dir.mkdirs()
    +    val logProps = new ju.Properties()
    +    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
    +    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float)
    --- End diff --
    
    Yeah, it's necessary, otherwise it gets treated as AnyRef.  Changed to Float.valueOf FWIW


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20572


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170278078
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala ---
    @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
     
       private val preferredHosts = LocationStrategies.PreferConsistent
     
    +  private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) {
    +    val mockTime = new MockTime()
    +    // LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api
    +    val logs = new Pool[TopicAndPartition, Log]()
    +    val logDir = kafkaTestUtils.brokerLogDir
    +    val dir = new java.io.File(logDir, topic + "-" + partition)
    --- End diff --
    
    Import `File`, other `java.*` classes? maybe I'm missing a name conflict.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87609 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87609/testReport)** for PR 20572 at commit [`248b511`](https://github.com/apache/spark/commit/248b5111651da3d570768a0f4ffffb603193285c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169490896
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala ---
    @@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V](
             s"skipping ${part.topic} ${part.partition}")
           Iterator.empty
         } else {
    -      new KafkaRDDIterator(part, context)
    +      logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
    +        s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    +      if (compacted) {
    +        new CompactedKafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      } else {
    +        new KafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      }
         }
       }
    +}
     
    -  /**
    -   * An iterator that fetches messages directly from Kafka for the offsets in partition.
    -   * Uses a cached consumer where possible to take advantage of prefetching
    -   */
    -  private class KafkaRDDIterator(
    -      part: KafkaRDDPartition,
    -      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
    -
    -    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
    -      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    -
    -    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -
    -    context.addTaskCompletionListener{ context => closeIfNeeded() }
    -
    -    val consumer = if (useConsumerCache) {
    -      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
    -      if (context.attemptNumber >= 1) {
    -        // just in case the prior attempt failures were cache related
    -        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
    -      }
    -      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    -    } else {
    -      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    +/**
    + * An iterator that fetches messages directly from Kafka for the offsets in partition.
    + * Uses a cached consumer where possible to take advantage of prefetching
    + */
    +private class KafkaRDDIterator[K, V](
    +  part: KafkaRDDPartition,
    +  context: TaskContext,
    +  kafkaParams: ju.Map[String, Object],
    +  useConsumerCache: Boolean,
    +  pollTimeout: Long,
    +  cacheInitialCapacity: Int,
    +  cacheMaxCapacity: Int,
    +  cacheLoadFactor: Float
    +) extends Iterator[ConsumerRecord[K, V]] {
    +
    +  val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +
    +  context.addTaskCompletionListener{ context => closeIfNeeded() }
    +
    +  val consumer = if (useConsumerCache) {
    +    CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
    +    if (context.attemptNumber >= 1) {
    +      // just in case the prior attempt failures were cache related
    +      CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
         }
    +    CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
    +  } else {
    +    CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
    +  }
     
    -    var requestOffset = part.fromOffset
    +  var requestOffset = part.fromOffset
     
    -    def closeIfNeeded(): Unit = {
    -      if (!useConsumerCache && consumer != null) {
    -        consumer.close
    -      }
    +  def closeIfNeeded(): Unit = {
    +    if (!useConsumerCache && consumer != null) {
    +      consumer.close
         }
    +  }
    +
    +  override def hasNext(): Boolean = requestOffset < part.untilOffset
     
    -    override def hasNext(): Boolean = requestOffset < part.untilOffset
    +  override def next(): ConsumerRecord[K, V] = {
    +    assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
    --- End diff --
    
    This seems like something that shouldn't be an assert because a caller could call `next()` out of order. Should be an exception, to be sure. `NoSuchElementException`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169536036
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala ---
    @@ -22,12 +22,17 @@ import java.{ util => ju }
     import scala.collection.JavaConverters._
     import scala.util.Random
     
    +import kafka.common.TopicAndPartition
    --- End diff --
    
    Right, LogCleaner hadn't yet been moved to the new apis, added a comment to that effect.
    Think we're ok here because it's just being used to mock up a compacted topic, not in the actual dstream api.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169491574
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka010.mocks
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.PriorityQueue
    +
    +import kafka.utils.{Scheduler, Time}
    +
    +/**
    + * A mock scheduler that executes tasks synchronously using a mock time instance.
    + * Tasks are executed synchronously when the time is advanced.
    + * This class is meant to be used in conjunction with MockTime.
    + *
    + * Example usage
    + * <code>
    + *   val time = new MockTime
    + *   time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000)
    + *   time.sleep(1001) // this should cause our scheduled task to fire
    + * </code>
    + *
    + * Incrementing the time to the exact next execution time of a task will result in that task
    + * executing (it as if execution itself takes no time).
    + */
    +private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
    +
    +  /* a priority queue of tasks ordered by next execution time */
    +  var tasks = new PriorityQueue[MockTask]()
    +
    +  def isStarted: Boolean = true
    +
    +  def startup() {}
    +
    +  def shutdown() {
    +    this synchronized {
    +      tasks.foreach(_.fun())
    +      tasks.clear()
    +    }
    +  }
    +
    +  /**
    +   * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs
    +   * when this method is called and the execution happens synchronously in the calling thread.
    +   * If you are using the scheduler associated with a MockTime instance this call
    +   * will be triggered automatically.
    +   */
    +  def tick() {
    +    this synchronized {
    +      val now = time.milliseconds
    +      while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
    +        /* pop and execute the task with the lowest next execution time */
    +        val curr = tasks.dequeue
    +        curr.fun()
    +        /* if the task is periodic, reschedule it and re-enqueue */
    +        if(curr.periodic) {
    +          curr.nextExecution += curr.period
    +          this.tasks += curr
    +        }
    +      }
    +    }
    +  }
    +
    +  def schedule(
    +      name: String,
    +      fun: () => Unit,
    +      delay: Long = 0,
    +      period: Long = -1,
    +      unit: TimeUnit = TimeUnit.MILLISECONDS) {
    +    this synchronized {
    +      tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
    +      tick()
    +    }
    +  }
    +
    +}
    +
    +case class MockTask(
    +    val name: String,
    +    val fun: () => Unit,
    +    var nextExecution: Long,
    +    val period: Long) extends Ordered[MockTask] {
    +  def periodic: Boolean = period >= 0
    +  def compare(t: MockTask): Int = {
    +    if (t.nextExecution == nextExecution) {
    --- End diff --
    
    `java.lang.Long.compare(t.nextExecution, nextExecution)` does this too in a line, but whatever


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169850605
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
             s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
           record = buffer.next()
           assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
    +        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting $offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    +    assert(buffer.hasNext(),
    --- End diff --
    
    Agreed that require is better, will fix it in a sec
    
    Pretty sure assert is just a function in Predef.scala that throws an AssertionError, it's not like a java assert statement that can be en / disabled with java -ea / -da.  Tested it out:
    
    https://gist.github.com/koeninger/6155cd94a19d1a6373ba0b40039e97e3
    
    Disabling scala asserts can be done at compile time with -Xdisable-assertions


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170279504
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala ---
    @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
     
       private val preferredHosts = LocationStrategies.PreferConsistent
     
    +  private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) {
    +    val mockTime = new MockTime()
    +    // LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api
    +    val logs = new Pool[TopicAndPartition, Log]()
    +    val logDir = kafkaTestUtils.brokerLogDir
    +    val dir = new java.io.File(logDir, topic + "-" + partition)
    +    dir.mkdirs()
    +    val logProps = new ju.Properties()
    +    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
    +    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float)
    +    val log = new Log(
    +      dir,
    +      LogConfig(logProps),
    +      0L,
    +      mockTime.scheduler,
    +      mockTime
    +    )
    +    messages.foreach { case (k, v) =>
    +        val msg = new ByteBufferMessageSet(
    --- End diff --
    
    Unindent one level?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170279150
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala ---
    @@ -87,47 +89,60 @@ private[spark] class KafkaRDD[K, V](
         }.toArray
       }
     
    -  override def count(): Long = offsetRanges.map(_.count).sum
    +  override def count(): Long =
    +    if (compacted) {
    +      super.count()
    +    } else {
    +      offsetRanges.map(_.count).sum
    +    }
     
       override def countApprox(
           timeout: Long,
           confidence: Double = 0.95
    -  ): PartialResult[BoundedDouble] = {
    -    val c = count
    -    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
    -  }
    +  ): PartialResult[BoundedDouble] =
    +    if (compacted) {
    +      super.countApprox(timeout, confidence)
    +    } else {
    +      val c = count
    +      new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
    +    }
     
    -  override def isEmpty(): Boolean = count == 0L
    +  override def isEmpty(): Boolean =
    +    if (compacted) {
    +      super.isEmpty()
    +    } else {
    +      count == 0L
    +    }
     
    -  override def take(num: Int): Array[ConsumerRecord[K, V]] = {
    -    val nonEmptyPartitions = this.partitions
    -      .map(_.asInstanceOf[KafkaRDDPartition])
    -      .filter(_.count > 0)
    +  override def take(num: Int): Array[ConsumerRecord[K, V]] =
    +    if (compacted) {
    +      super.take(num)
    +    } else {
    +      val nonEmptyPartitions = this.partitions
    +        .map(_.asInstanceOf[KafkaRDDPartition])
    +        .filter(_.count > 0)
     
    -    if (num < 1 || nonEmptyPartitions.isEmpty) {
    -      return new Array[ConsumerRecord[K, V]](0)
    -    }
    +      if (num < 1 || nonEmptyPartitions.isEmpty) {
    --- End diff --
    
    I guess you could check `num < 1` before the map/filter, but it's trivial.
    You could write `return Array.empty[ConsumerRecord[K,V]]` too; again trivial.
    Since this is existing code I could see not touching it as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/979/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by koeninger <gi...@git.apache.org>.
Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    @zsxwing you have time to review this?  It's been a long standing issue.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169491286
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka010.mocks
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.PriorityQueue
    +
    +import kafka.utils.{Scheduler, Time}
    +
    +/**
    + * A mock scheduler that executes tasks synchronously using a mock time instance.
    + * Tasks are executed synchronously when the time is advanced.
    + * This class is meant to be used in conjunction with MockTime.
    + *
    + * Example usage
    + * <code>
    + *   val time = new MockTime
    + *   time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000)
    + *   time.sleep(1001) // this should cause our scheduled task to fire
    + * </code>
    + *
    + * Incrementing the time to the exact next execution time of a task will result in that task
    + * executing (it as if execution itself takes no time).
    + */
    +private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
    +
    +  /* a priority queue of tasks ordered by next execution time */
    +  var tasks = new PriorityQueue[MockTask]()
    +
    +  def isStarted: Boolean = true
    +
    +  def startup() {}
    +
    +  def shutdown() {
    --- End diff --
    
    Just a style question or nit, but, what about: `def shutdown(): Unit = synchronized { ...`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1078/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87576 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87576/testReport)** for PR 20572 at commit [`08f3570`](https://github.com/apache/spark/commit/08f3570c0491f96abcaa9a6dc0f4e3030cfea6c0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169489462
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private(
     
       // TODO if the buffer was kept around as a random-access structure,
       // could possibly optimize re-calculating of an RDD in the same batch
    -  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
    +  protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
    --- End diff --
    
    Was going to comment on the fact that it's `protected`, and what if a subclass depend on this, but, as the whole class is package-private I guess that's not an issue?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170278317
  
    --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala ---
    @@ -162,17 +162,22 @@ private[kafka010] class KafkaTestUtils extends Logging {
       }
     
       /** Create a Kafka topic and wait until it is propagated to the whole cluster */
    -  def createTopic(topic: String, partitions: Int): Unit = {
    -    AdminUtils.createTopic(zkUtils, topic, partitions, 1)
    +  def createTopic(topic: String, partitions: Int, config: Properties): Unit = {
    +    AdminUtils.createTopic(zkUtils, topic, partitions, 1, config)
         // wait until metadata is propagated
         (0 until partitions).foreach { p =>
           waitUntilMetadataIsPropagated(topic, p)
         }
       }
     
    +  /** Create a Kafka topic and wait until it is propagated to the whole cluster */
    +  def createTopic(topic: String, partitions: Int): Unit = {
    +    createTopic(topic, partitions, new Properties)
    +  }
    +
       /** Create a Kafka topic and wait until it is propagated to the whole cluster */
       def createTopic(topic: String): Unit = {
    -    createTopic(topic, 1)
    +    createTopic(topic, 1, new Properties)
    --- End diff --
    
    Nit: `new Properties()`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by justinrmiller <gi...@git.apache.org>.
Github user justinrmiller commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Do you think it would be possible for this to go into 2.3.0?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87690/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170277915
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala ---
    @@ -172,57 +187,138 @@ private[spark] class KafkaRDD[K, V](
     
       override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
         val part = thePart.asInstanceOf[KafkaRDDPartition]
    -    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
    +    require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
         if (part.fromOffset == part.untilOffset) {
           logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
             s"skipping ${part.topic} ${part.partition}")
           Iterator.empty
         } else {
    -      new KafkaRDDIterator(part, context)
    +      logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
    +        s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    +      if (compacted) {
    +        new CompactedKafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      } else {
    +        new KafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      }
         }
       }
    +}
     
    -  /**
    -   * An iterator that fetches messages directly from Kafka for the offsets in partition.
    -   * Uses a cached consumer where possible to take advantage of prefetching
    -   */
    -  private class KafkaRDDIterator(
    -      part: KafkaRDDPartition,
    -      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
    -
    -    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
    -      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    -
    -    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +/**
    + * An iterator that fetches messages directly from Kafka for the offsets in partition.
    + * Uses a cached consumer where possible to take advantage of prefetching
    + */
    +private class KafkaRDDIterator[K, V](
    +  part: KafkaRDDPartition,
    +  context: TaskContext,
    +  kafkaParams: ju.Map[String, Object],
    +  useConsumerCache: Boolean,
    +  pollTimeout: Long,
    +  cacheInitialCapacity: Int,
    +  cacheMaxCapacity: Int,
    +  cacheLoadFactor: Float
    +) extends Iterator[ConsumerRecord[K, V]] {
    +
    +  val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +
    +  context.addTaskCompletionListener{ context => closeIfNeeded() }
    --- End diff --
    
    This could be `...(_ => closeIfNeeded())`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87690 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87690/testReport)** for PR 20572 at commit [`e3ae845`](https://github.com/apache/spark/commit/e3ae84523621405d2f2b55ec92cb79921aaba961).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by justinrmiller <gi...@git.apache.org>.
Github user justinrmiller commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    This looks good to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    **[Test build #87296 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87296/testReport)** for PR 20572 at commit [`2ed51f1`](https://github.com/apache/spark/commit/2ed51f1f73ee75ffd08355265a72e68e83ef592d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive of...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20572
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87609/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170278931
  
    --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---
    @@ -71,25 +69,62 @@ class CachedKafkaConsumer[K, V] private(
         }
     
         if (!buffer.hasNext()) { poll(timeout) }
    -    assert(buffer.hasNext(),
    +    require(buffer.hasNext(),
           s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
         var record = buffer.next()
     
         if (record.offset != offset) {
           logInfo(s"Buffer miss for $groupId $topic $partition $offset")
           seek(offset)
           poll(timeout)
    -      assert(buffer.hasNext(),
    +      require(buffer.hasNext(),
             s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
           record = buffer.next()
    -      assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
    +      require(record.offset == offset,
    +        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting $offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    --- End diff --
    
    Nit: I'd expand this onto two lines


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org