You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2018/08/08 17:52:00 UTC

[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

GitHub user zsxwing opened a pull request:

    https://github.com/apache/spark/pull/22042

    [SPARK-25005][SS]Support non-consecutive offsets for Kafka

    ## What changes were proposed in this pull request?
    
    As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support:
    
    - The whole batch contains no data messages
    - The first offset in a batch is not a committed data message
    - The last offset in a batch is not a committed data message
    - There is a gap in the middle of a batch
    
    They are all covered by the new unit tests.
    
    ## How was this patch tested?
    
    The new unit tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zsxwing/spark kafka-transaction-read

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22042.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22042
    
----
commit dc18a6ff59fe7c48ed188a4eb9a6abf04caee0bd
Author: Shixiong Zhu <zs...@...>
Date:   2018-08-08T17:40:37Z

    Support non-consecutive offsets for Kafka

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211801676
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
           offset: Long,
           untilOffset: Long,
           pollTimeoutMs: Long,
    -      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +      failOnDataLoss: Boolean): FetchedRecord = {
    +    if (offset != fetchedData.nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
    -      seek(offset)
    -      poll(pollTimeoutMs)
    -    }
    -
    -    if (!fetchedData.hasNext()) {
    -      // We cannot fetch anything after `poll`. Two possible cases:
    -      // - `offset` is out of range so that Kafka returns nothing. Just throw
    -      // `OffsetOutOfRangeException` to let the caller handle it.
    -      // - Cannot fetch any data before timeout. TimeoutException will be thrown.
    -      val range = getAvailableOffsetRange()
    -      if (offset < range.earliest || offset >= range.latest) {
    -        throw new OffsetOutOfRangeException(
    -          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    +      poll(offset, pollTimeoutMs)
    --- End diff --
    
    comment that this method updates `fetchedData`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2579/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2216/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r212033844
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -337,6 +338,7 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader(
           val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
           if (record != null) {
             nextRow = converter.toUnsafeRow(record)
    +        nextOffset = record.offset + 1
    --- End diff --
    
     We should update `nextOffset` to `record.offset + 1` rather that `nextOffset + 1`. Otherwise, it may return duplicated records when `failOnDataLoss` is `false`. I will submit another PR to push this fix to 2.3 as it's a correctness issue.
    
    In addition, we should change `nextOffset` in the `next` method as the `get` method is designed to be called multiple times.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r212522664
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -161,6 +161,22 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
           s"AddKafkaData(topics = $topics, data = $data, message = $message)"
       }
     
    +  object WithKafkaProducer {
    --- End diff --
    
    nit: This is not creating a KafkaProducer .. as most `With***` methods. The point of this is to force synchronization of the consumer. So maybe rename it to `WithOffsetSync { ... }`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211804454
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
           }
         )
       }
    +
    +  test("read Kafka transactional messages: read_committed") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_committed")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    --- End diff --
    
    use `Execute` 
    and comment on what this does and why we need it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2546/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211805993
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---
    @@ -327,6 +332,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
         props
       }
     
    +  def createProducer(usingTrascation: Boolean): KafkaProducer[String, String] = {
    --- End diff --
    
    nit: usingTrascation -> usingTra**n**scation


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94809/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211795985
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -80,6 +90,72 @@ private[kafka010] case class InternalKafkaConsumer(
         kafkaParams: ju.Map[String, Object]) extends Logging {
       import InternalKafkaConsumer._
     
    +  /**
    +   * The internal object to store the fetched data from Kafka consumer and the next offset to poll.
    +   *
    +   * @param records the pre-fetched Kafka records.
    +   * @param nextOffsetInFetchedData the next offset in `records`. We use this to verify if we should
    +   *                                check if the pre-fetched data is still valid.
    +   * @param offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to poll
    +   *                        when `records` is drained.
    +   */
    +  private case class FetchedData(
    +      private var records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
    +      var nextOffsetInFetchedData: Long,
    --- End diff --
    
    Make this public getter, private setter.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209476548
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
           untilOffset: Long,
           pollTimeoutMs: Long,
           failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +    if (offset != nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
           seek(offset)
           poll(pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) {
    +      // The last pre-fetched data has been drained.
    +      if (offset < offsetAfterPoll) {
    +        // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
    +        resetFetchedData()
    +        throw new MissingOffsetException(offset, offsetAfterPoll)
    +      } else {
    +        seek(offset)
    +        poll(pollTimeoutMs)
    +      }
         }
     
         if (!fetchedData.hasNext()) {
    -      // We cannot fetch anything after `poll`. Two possible cases:
    +      // We cannot fetch anything after `poll`. Three possible cases:
           // - `offset` is out of range so that Kafka returns nothing. Just throw
           // `OffsetOutOfRangeException` to let the caller handle it.
           // - Cannot fetch any data before timeout. TimeoutException will be thrown.
    +      // - Fetched something but all of them are not valid date messages. In this case, the position
    --- End diff --
    
    date => data


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95063 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95063/testReport)** for PR 22042 at commit [`a06742f`](https://github.com/apache/spark/commit/a06742fd3d19c3ee6d9c957b446bc5017be009bc).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r212507190
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
           s"AddKafkaData(topics = $topics, data = $data, message = $message)"
       }
     
    +  object WithKafkaProducer {
    +    def apply(
    +        topic: String,
    +        producer: KafkaProducer[String, String])(
    --- End diff --
    
    Ping on this comment. Maybe you missed this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1958/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95056/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211804704
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
           s"AddKafkaData(topics = $topics, data = $data, message = $message)"
       }
     
    +  object WithKafkaProducer {
    +    def apply(
    +        topic: String,
    +        producer: KafkaProducer[String, String])(
    --- End diff --
    
    Why pass producer when all you are doing is to pass it to the function. The function can do it on its own.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95115 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95115/testReport)** for PR 22042 at commit [`603e0bc`](https://github.com/apache/spark/commit/603e0bc9cc822ec3151159a88a521ac063932f11).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211802112
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
           offset: Long,
           untilOffset: Long,
           pollTimeoutMs: Long,
    -      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +      failOnDataLoss: Boolean): FetchedRecord = {
    +    if (offset != fetchedData.nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
    -      seek(offset)
    -      poll(pollTimeoutMs)
    -    }
    -
    -    if (!fetchedData.hasNext()) {
    -      // We cannot fetch anything after `poll`. Two possible cases:
    -      // - `offset` is out of range so that Kafka returns nothing. Just throw
    -      // `OffsetOutOfRangeException` to let the caller handle it.
    -      // - Cannot fetch any data before timeout. TimeoutException will be thrown.
    -      val range = getAvailableOffsetRange()
    -      if (offset < range.earliest || offset >= range.latest) {
    -        throw new OffsetOutOfRangeException(
    -          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    +      poll(offset, pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) {
    +      // The last pre-fetched data has been drained.
    +      if (offset < fetchedData.offsetAfterPoll) {
    +        // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
    --- End diff --
    
    "skip them" is confusing. What does it mean to skip? Why are we still returning something.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209476712
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -347,9 +391,12 @@ private[kafka010] case class InternalKafkaConsumer(
       }
     
       private def poll(pollTimeoutMs: Long): Unit = {
    +    offsetBeforePoll = consumer.position(topicPartition)
    --- End diff --
    
    This variable `offsetBeforePoll` seems to be only used to identify whether data was actually fetched in a poll and nothing else. Rather than define another var (there are already many that already confusing), why not just return a boolean from poll which is true or false depending on whether poll moved anything.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211805733
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
           }
         )
       }
    +
    +  test("read Kafka transactional messages: read_committed") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_committed")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        // Should not see any uncommitted messages
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages and abort the transaction. They should not be read.
    +          producer.beginTransaction()
    +          (6 to 10).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.abortTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages again. The consumer should skip the above aborted messages and read
    +          // them.
    +          producer.beginTransaction()
    +          (11 to 15).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 15): _*),  // offset: 15, 16, 17*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "16")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "17")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "18")).get()
    +          producer.send(new ProducerRecord[String, String](topic, "19")).get()
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
    +      )
    +    } finally {
    +      producer.close()
    +    }
    +  }
    +
    +  test("read Kafka transactional messages: read_uncommitted") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_uncommitted")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being committed.
    --- End diff --
    
    Why so? This read_uncommitted, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209473432
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -31,6 +31,17 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
     import org.apache.spark.sql.kafka010.KafkaSourceProvider._
     import org.apache.spark.util.UninterruptibleThread
     
    +/**
    + * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
    + * This means it's either a transaction (commit or abort) marker, or an aborted message if
    + * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
    + * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
    + */
    +private[kafka010] class MissingOffsetException(
    +    val offset: Long,
    --- End diff --
    
    maybe rename offset to something like missingOffset. Its weird to have a generic named field "offset" next to a specifically named field "nextOffsetToFetch".


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95297/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209474755
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
           untilOffset: Long,
    --- End diff --
    
    Update docs of this method saying that it can throw MissingOffsetException and what it means?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95063/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211804879
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
           }
         )
       }
    +
    +  test("read Kafka transactional messages: read_committed") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_committed")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        // Should not see any uncommitted messages
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    --- End diff --
    
    Why is this `waitUntilBatchProcessed` needed? CheckAnswer waits for the batch to complete anyways.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #94809 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94809/testReport)** for PR 22042 at commit [`a9b00b4`](https://github.com/apache/spark/commit/a9b00b4a22f0b6b364cd1b35e2d99923d8b233dc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #94441 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94441/testReport)** for PR 22042 at commit [`dc18a6f`](https://github.com/apache/spark/commit/dc18a6ff59fe7c48ed188a4eb9a6abf04caee0bd).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95297 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95297/testReport)** for PR 22042 at commit [`7a02921`](https://github.com/apache/spark/commit/7a02921950cda865e3cd45f1d1635212c2f707c0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #94808 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94808/testReport)** for PR 22042 at commit [`baef29f`](https://github.com/apache/spark/commit/baef29f2983560c8010681c9bb7e74f711c8f2e7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95236 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95236/testReport)** for PR 22042 at commit [`7a02921`](https://github.com/apache/spark/commit/7a02921950cda865e3cd45f1d1635212c2f707c0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22042


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209475048
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
           untilOffset: Long,
           pollTimeoutMs: Long,
           failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +    if (offset != nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
           seek(offset)
           poll(pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) {
    +      // The last pre-fetched data has been drained.
    +      if (offset < offsetAfterPoll) {
    --- End diff --
    
    Its hard to understand this condition because it hard to understand what offsetAfterPoll means? Does it refer to the offset that will be fetched next by the KafkaConsumer? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95319/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #94441 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94441/testReport)** for PR 22042 at commit [`dc18a6f`](https://github.com/apache/spark/commit/dc18a6ff59fe7c48ed188a4eb9a6abf04caee0bd).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95115/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95056 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95056/testReport)** for PR 22042 at commit [`f379d47`](https://github.com/apache/spark/commit/f379d47e30643fe92b751aa7aa374815ac66a55c).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r212032759
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
           }
         )
       }
    +
    +  test("read Kafka transactional messages: read_committed") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_committed")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        // Should not see any uncommitted messages
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages and abort the transaction. They should not be read.
    +          producer.beginTransaction()
    +          (6 to 10).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.abortTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages again. The consumer should skip the above aborted messages and read
    +          // them.
    +          producer.beginTransaction()
    +          (11 to 15).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 15): _*),  // offset: 15, 16, 17*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "16")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "17")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "18")).get()
    +          producer.send(new ProducerRecord[String, String](topic, "19")).get()
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
    +      )
    +    } finally {
    +      producer.close()
    +    }
    +  }
    +
    +  test("read Kafka transactional messages: read_uncommitted") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_uncommitted")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    --- End diff --
    
    > Why only 3 records when 1 to 5 has been sent already and we are reading uncommitted data?
    
    I'm using `maxOffsetsPerTrigger = 3` to cut the batches on purpose. Otherwise, it's really hard to cover all of cases. 
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #94446 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94446/testReport)** for PR 22042 at commit [`dfea7e3`](https://github.com/apache/spark/commit/dfea7e363ef479c3783171bc3644be61d74beee7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211801549
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
           offset: Long,
    --- End diff --
    
    Maybe rename this method to fetchRecord, to make it consistent with return type.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    cc @tdas 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r210423180
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -91,6 +90,17 @@ private[kafka010] case class InternalKafkaConsumer(
         kafkaParams: ju.Map[String, Object]) extends Logging {
       import InternalKafkaConsumer._
     
    +  /**
    +   * The internal object returned by the `fetchData` method. If `record` is empty, it means it is
    +   * invisible (either a transaction message, or an aborted message when the consumer's
    +   * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
    +   * instead.
    +   */
    +  private case class FetchedRecord(
    +    record: Option[ConsumerRecord[Array[Byte], Array[Byte]]],
    --- End diff --
    
    Can;t we reuse the objects here. And do we need to have an Option, thus creating a lot of Option objects all the time?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #94809 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94809/testReport)** for PR 22042 at commit [`a9b00b4`](https://github.com/apache/spark/commit/a9b00b4a22f0b6b364cd1b35e2d99923d8b233dc).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211786471
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -250,33 +294,42 @@ private[kafka010] case class InternalKafkaConsumer(
           offset: Long,
           untilOffset: Long,
           pollTimeoutMs: Long,
    -      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +      failOnDataLoss: Boolean): FetchedRecord = {
    +    if (offset != nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
    -      seek(offset)
    -      poll(pollTimeoutMs)
    +      poll(offset, pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) {
    +      // The last pre-fetched data has been drained.
    +      if (offset < offsetAfterPoll) {
    --- End diff --
    
    this is the place preventing me from making `offsetAfterPoll` be a local var.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #94808 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94808/testReport)** for PR 22042 at commit [`baef29f`](https://github.com/apache/spark/commit/baef29f2983560c8010681c9bb7e74f711c8f2e7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94441/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209477156
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
           untilOffset: Long,
           pollTimeoutMs: Long,
           failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +    if (offset != nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
           seek(offset)
           poll(pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) {
    +      // The last pre-fetched data has been drained.
    +      if (offset < offsetAfterPoll) {
    +        // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
    +        resetFetchedData()
    +        throw new MissingOffsetException(offset, offsetAfterPoll)
    +      } else {
    +        seek(offset)
    +        poll(pollTimeoutMs)
    +      }
         }
     
         if (!fetchedData.hasNext()) {
    -      // We cannot fetch anything after `poll`. Two possible cases:
    +      // We cannot fetch anything after `poll`. Three possible cases:
           // - `offset` is out of range so that Kafka returns nothing. Just throw
           // `OffsetOutOfRangeException` to let the caller handle it.
           // - Cannot fetch any data before timeout. TimeoutException will be thrown.
    +      // - Fetched something but all of them are not valid date messages. In this case, the position
    +      //   will be changed and we can use it to determine this case.
           val range = getAvailableOffsetRange()
           if (offset < range.earliest || offset >= range.latest) {
             throw new OffsetOutOfRangeException(
               Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    -      } else {
    +      } else if (offsetBeforePoll == offsetAfterPoll) {
    --- End diff --
    
    Just to be clear, can this happen only if there is a timeout?? And if so then why push this condition and exception into the poll() method thus simplifying this method?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95236/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95063 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95063/testReport)** for PR 22042 at commit [`a06742f`](https://github.com/apache/spark/commit/a06742fd3d19c3ee6d9c957b446bc5017be009bc).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2454/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r212521083
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -239,56 +335,74 @@ private[kafka010] case class InternalKafkaConsumer(
       }
     
       /**
    -   * Get the record for the given offset if available. Otherwise it will either throw error
    -   * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
    -   * or null.
    +   * Get the fetched record for the given offset if available.
    +   *
    +   * If the record is invisible (either a  transaction message, or an aborted message when the
    +   * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the
    +   * next offset to fetch.
    +   *
    +   * This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will
    +   * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
    +   * method will return `null` if the next available record is within [offset, untilOffset).
        *
        * @throws OffsetOutOfRangeException if `offset` is out of range
        * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
        */
    -  private def fetchData(
    +  private def fetchRecord(
           offset: Long,
           untilOffset: Long,
           pollTimeoutMs: Long,
    -      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    -      // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
    -      seek(offset)
    -      poll(pollTimeoutMs)
    -    }
    -
    -    if (!fetchedData.hasNext()) {
    -      // We cannot fetch anything after `poll`. Two possible cases:
    -      // - `offset` is out of range so that Kafka returns nothing. Just throw
    -      // `OffsetOutOfRangeException` to let the caller handle it.
    -      // - Cannot fetch any data before timeout. TimeoutException will be thrown.
    -      val range = getAvailableOffsetRange()
    -      if (offset < range.earliest || offset >= range.latest) {
    -        throw new OffsetOutOfRangeException(
    -          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    +      failOnDataLoss: Boolean): FetchedRecord = {
    +    if (offset != fetchedData.nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
    +      // Fetch records from Kafka and update `fetchedData`.
    +      fetchData(offset, pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained.
    +      if (offset < fetchedData.offsetAfterPoll) {
    +        // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask
    +        // the next call to start from `fetchedData.offsetAfterPoll`.
    +        fetchedData.reset()
    +        return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
           } else {
    -        throw new TimeoutException(
    -          s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
    +        // Fetch records from Kafka and update `fetchedData`.
    +        fetchData(offset, pollTimeoutMs)
           }
    +    }
    +
    +    if (!fetchedData.hasNext) {
    +      // When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still
    +      // empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a
    +      // record to ask the next call to start from `fetchedData.offsetAfterPoll`.
    +      assert(offset <= fetchedData.offsetAfterPoll,
    +        s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}")
    +      fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
         } else {
           val record = fetchedData.next()
    -      nextOffsetInFetchedData = record.offset + 1
           // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
           // available offset. Hence we need to handle offset mismatch.
           if (record.offset > offset) {
    +        val range = getAvailableOffsetRange()
    +        if (range.earliest <= offset) {
    +          // `offset` is still valid but the corresponding message is invisible. We should skip it
    +          // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
    +          // `fetchRecord` can just return `record` directly.
    +          fetchedData.previous()
    +          return fetchedRecord.withRecord(null, record.offset)
    +        }
             // This may happen when some records aged out but their offsets already got verified
             if (failOnDataLoss) {
               reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
               // Never happen as "reportDataLoss" will throw an exception
    -          null
    +          throw new IllegalStateException(
    +            "reportDataLoss didn't throw an exception when 'failOnDataLoss' is true")
             } else {
               if (record.offset >= untilOffset) {
                 reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
    -            null
    +            // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
    +            fetchedRecord.withRecord(null, untilOffset)
               } else {
                 reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
    -            record
    +            fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
               }
    --- End diff --
    
    nit: This can be unnested. 
    if  ... else { if ... else ... } -> if ... else if .. else 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r210985375
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -346,11 +385,40 @@ private[kafka010] case class InternalKafkaConsumer(
         consumer.seek(topicPartition, offset)
       }
     
    -  private def poll(pollTimeoutMs: Long): Unit = {
    +  /**
    +   * Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`.
    +   * `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible
    +   * messages (either transaction messages, or aborted messages when `isolation.level` is
    +   * `read_committed`).
    +   *
    +   * @throws OffsetOutOfRangeException if `offset` is out of range.
    +   * @throws TimeoutException if the consumer position is not changed after polling. It means the
    +   *                          consumer polls nothing before timeout.
    +   */
    +  private def poll(offset: Long, pollTimeoutMs: Long): Unit = {
    +    seek(offset)
         val p = consumer.poll(pollTimeoutMs)
         val r = p.records(topicPartition)
         logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
    -    fetchedData = r.iterator
    +    offsetAfterPoll = consumer.position(topicPartition)
    --- End diff --
    
    I strongly think that this should not be a var, rather a clear return value. we have been burnt by too many mutable vars/defs (see all the flakiness caused by the structured ProgressReporter) and we should consciously try to improve this everywhere by not having vars all over the place.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Thanks! Merging to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211805409
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
           }
         )
       }
    +
    +  test("read Kafka transactional messages: read_committed") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_committed")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        // Should not see any uncommitted messages
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages and abort the transaction. They should not be read.
    +          producer.beginTransaction()
    +          (6 to 10).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.abortTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages again. The consumer should skip the above aborted messages and read
    +          // them.
    +          producer.beginTransaction()
    +          (11 to 15).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 15): _*),  // offset: 15, 16, 17*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "16")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "17")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "18")).get()
    +          producer.send(new ProducerRecord[String, String](topic, "19")).get()
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
    +      )
    +    } finally {
    +      producer.close()
    +    }
    +  }
    +
    +  test("read Kafka transactional messages: read_uncommitted") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_uncommitted")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    --- End diff --
    
    You could define a testWithProducer method and wrap the finally in it. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211802489
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -288,7 +385,7 @@ private[kafka010] case class InternalKafkaConsumer(
                 null
    --- End diff --
    
    We should not be returning null EVER when we are using `FetchedRecord.record = null` to signify lack of record.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211801968
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
           offset: Long,
           untilOffset: Long,
           pollTimeoutMs: Long,
    -      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +      failOnDataLoss: Boolean): FetchedRecord = {
    +    if (offset != fetchedData.nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
    -      seek(offset)
    -      poll(pollTimeoutMs)
    -    }
    -
    -    if (!fetchedData.hasNext()) {
    -      // We cannot fetch anything after `poll`. Two possible cases:
    -      // - `offset` is out of range so that Kafka returns nothing. Just throw
    -      // `OffsetOutOfRangeException` to let the caller handle it.
    -      // - Cannot fetch any data before timeout. TimeoutException will be thrown.
    -      val range = getAvailableOffsetRange()
    -      if (offset < range.earliest || offset >= range.latest) {
    -        throw new OffsetOutOfRangeException(
    -          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    +      poll(offset, pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) {
    +      // The last pre-fetched data has been drained.
    --- End diff --
    
    nit: I was confused with whether the above comment was for the `else if` above it or for the `if` below it. Maybe inline it with the `else if`. Or leave a line after it, before the `if` below.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209479417
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
           untilOffset: Long,
           pollTimeoutMs: Long,
           failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +    if (offset != nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
           seek(offset)
           poll(pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) {
    +      // The last pre-fetched data has been drained.
    +      if (offset < offsetAfterPoll) {
    +        // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
    +        resetFetchedData()
    +        throw new MissingOffsetException(offset, offsetAfterPoll)
    --- End diff --
    
    So MissingOffsetRange is only used to signal that some offset may be missing due to control messages and nothing else. And the higher function (i.e. `get`) just handles it by resetting the fetched offsets. Why not let this `fetchData` method handle the situation instead of creating a new exception just for this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2404/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209478033
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
           untilOffset: Long,
           pollTimeoutMs: Long,
           failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +    if (offset != nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
           seek(offset)
           poll(pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) {
    +      // The last pre-fetched data has been drained.
    +      if (offset < offsetAfterPoll) {
    +        // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
    +        resetFetchedData()
    +        throw new MissingOffsetException(offset, offsetAfterPoll)
    +      } else {
    +        seek(offset)
    +        poll(pollTimeoutMs)
    +      }
         }
     
         if (!fetchedData.hasNext()) {
    -      // We cannot fetch anything after `poll`. Two possible cases:
    +      // We cannot fetch anything after `poll`. Three possible cases:
           // - `offset` is out of range so that Kafka returns nothing. Just throw
           // `OffsetOutOfRangeException` to let the caller handle it.
           // - Cannot fetch any data before timeout. TimeoutException will be thrown.
    +      // - Fetched something but all of them are not valid date messages. In this case, the position
    +      //   will be changed and we can use it to determine this case.
           val range = getAvailableOffsetRange()
           if (offset < range.earliest || offset >= range.latest) {
             throw new OffsetOutOfRangeException(
               Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    -      } else {
    +      } else if (offsetBeforePoll == offsetAfterPoll) {
             throw new TimeoutException(
               s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
    +      } else {
    +        assert(offset <= offsetAfterPoll,
    +          s"seek to $offset and poll but the offset was reset to $offsetAfterPoll")
    +        throw new MissingOffsetException(offset, offsetAfterPoll)
           }
         } else {
    --- End diff --
    
    Let's remove this else and reduce the condition nesting. The previous `if` statement always ends in an exception, so we can remove this else.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209473316
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -95,6 +106,10 @@ private[kafka010] case class InternalKafkaConsumer(
         ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
       @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET
     
    +  @volatile private var offsetBeforePoll: Long = UNKNOWN_OFFSET
    --- End diff --
    
    Can you add some docs to explain what these 2 vars siginify and why these vars are needed?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95319 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95319/testReport)** for PR 22042 at commit [`ea804cf`](https://github.com/apache/spark/commit/ea804cfe840196519cc9444be9bedf03d10aa11a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2596/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211801254
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
           offset: Long,
           untilOffset: Long,
           pollTimeoutMs: Long,
    -      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been drained.
    +      failOnDataLoss: Boolean): FetchedRecord = {
    +    if (offset != fetchedData.nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
           // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
    -      seek(offset)
    -      poll(pollTimeoutMs)
    -    }
    -
    -    if (!fetchedData.hasNext()) {
    -      // We cannot fetch anything after `poll`. Two possible cases:
    -      // - `offset` is out of range so that Kafka returns nothing. Just throw
    -      // `OffsetOutOfRangeException` to let the caller handle it.
    -      // - Cannot fetch any data before timeout. TimeoutException will be thrown.
    -      val range = getAvailableOffsetRange()
    -      if (offset < range.earliest || offset >= range.latest) {
    -        throw new OffsetOutOfRangeException(
    -          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    +      poll(offset, pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) {
    +      // The last pre-fetched data has been drained.
    +      if (offset < fetchedData.offsetAfterPoll) {
    +        // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
    +        fetchedData.reset()
    +        return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
           } else {
    -        throw new TimeoutException(
    -          s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
    +        poll(offset, pollTimeoutMs)
           }
    +    }
    +
    +    if (!fetchedData.hasNext) {
    +      assert(offset <= fetchedData.offsetAfterPoll,
    --- End diff --
    
    Add comments here on what this case means.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r212504622
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala ---
    @@ -331,6 +331,7 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
         offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
     
       private val rangeToRead = resolveRange(offsetRange)
    +
    --- End diff --
    
    unnecessary 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2215/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r210422521
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
     import org.apache.spark.sql.kafka010.KafkaSourceProvider._
     import org.apache.spark.util.UninterruptibleThread
     
    -/**
    - * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
    - * This means it's either a transaction (commit or abort) marker, or an aborted message if
    - * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
    - * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
    - */
    -private[kafka010] class MissingOffsetException(
    -    val offset: Long,
    -    val nextOffsetToFetch: Long) extends Exception(
    -  s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch")
    -
     private[kafka010] sealed trait KafkaDataConsumer {
       /**
    -   * Get the record for the given offset if available. Otherwise it will either throw error
    -   * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
    -   * or null.
    +   * Get the record for the given offset if available.
    +   *
    +   * If the record is invisible (either a
    +   * transaction message, or an aborted message when the consumer's `isolation.level` is
    +   * `read_committed`), it will be skipped and this method will try to fetch next available record
    +   * within [offset, untilOffset).
    +   *
    +   * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will
    --- End diff --
    
    if failOnDataLoss is *true* then it should throw exception... isnt it?
    
    nit: try its best



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #94446 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94446/testReport)** for PR 22042 at commit [`dfea7e3`](https://github.com/apache/spark/commit/dfea7e363ef479c3783171bc3644be61d74beee7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r212522432
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -161,6 +161,22 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
           s"AddKafkaData(topics = $topics, data = $data, message = $message)"
       }
     
    +  object WithKafkaProducer {
    +    def apply(
    +        topic: String,
    +        producer: KafkaProducer[String, String])(
    +        func: KafkaProducer[String, String] => Unit): AssertOnQuery = {
    --- End diff --
    
    nit: AssertOnQuery -> StreamAction


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    > This patch fails Spark unit tests.
    
    This is the flaky test I fixed in #22230
    
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    LGTM.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209473392
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -31,6 +31,17 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
     import org.apache.spark.sql.kafka010.KafkaSourceProvider._
     import org.apache.spark.util.UninterruptibleThread
     
    +/**
    + * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
    + * This means it's either a transaction (commit or abort) marker, or an aborted message if
    + * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
    + * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
    + */
    +private[kafka010] class MissingOffsetException(
    --- End diff --
    
    nit: Is this meant to be used outside this KafkaDataConsumer class? If not, then maybe make it an inner class to KafkaDataConsumer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95319 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95319/testReport)** for PR 22042 at commit [`ea804cf`](https://github.com/apache/spark/commit/ea804cfe840196519cc9444be9bedf03d10aa11a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211801632
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -346,11 +437,40 @@ private[kafka010] case class InternalKafkaConsumer(
         consumer.seek(topicPartition, offset)
       }
     
    -  private def poll(pollTimeoutMs: Long): Unit = {
    +  /**
    +   * Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`.
    +   * `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible
    +   * messages (either transaction messages, or aborted messages when `isolation.level` is
    +   * `read_committed`).
    +   *
    +   * @throws OffsetOutOfRangeException if `offset` is out of range.
    +   * @throws TimeoutException if the consumer position is not changed after polling. It means the
    +   *                          consumer polls nothing before timeout.
    +   */
    +  private def poll(offset: Long, pollTimeoutMs: Long): Unit = {
    --- End diff --
    
    Maybe rename this method to be consistent with that it does .... fetch data. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95236 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95236/testReport)** for PR 22042 at commit [`7a02921`](https://github.com/apache/spark/commit/7a02921950cda865e3cd45f1d1635212c2f707c0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95056 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95056/testReport)** for PR 22042 at commit [`f379d47`](https://github.com/apache/spark/commit/f379d47e30643fe92b751aa7aa374815ac66a55c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211805275
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
           }
         )
       }
    +
    +  test("read Kafka transactional messages: read_committed") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_committed")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        // Should not see any uncommitted messages
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages and abort the transaction. They should not be read.
    +          producer.beginTransaction()
    +          (6 to 10).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.abortTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages again. The consumer should skip the above aborted messages and read
    +          // them.
    +          producer.beginTransaction()
    +          (11 to 15).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 15): _*),  // offset: 15, 16, 17*
    --- End diff --
    
    Use CheckNewAnswer instead cumulative CheckAnswer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1961/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r210422755
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
     import org.apache.spark.sql.kafka010.KafkaSourceProvider._
     import org.apache.spark.util.UninterruptibleThread
     
    -/**
    - * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
    - * This means it's either a transaction (commit or abort) marker, or an aborted message if
    - * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
    - * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
    - */
    -private[kafka010] class MissingOffsetException(
    -    val offset: Long,
    -    val nextOffsetToFetch: Long) extends Exception(
    -  s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch")
    -
     private[kafka010] sealed trait KafkaDataConsumer {
       /**
    -   * Get the record for the given offset if available. Otherwise it will either throw error
    -   * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
    -   * or null.
    +   * Get the record for the given offset if available.
    +   *
    +   * If the record is invisible (either a
    +   * transaction message, or an aborted message when the consumer's `isolation.level` is
    +   * `read_committed`), it will be skipped and this method will try to fetch next available record
    +   * within [offset, untilOffset).
    +   *
    +   * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will
    +   * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this
    --- End diff --
    
    Will we throw an exception even when its a control message and there is no real data loss?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95297 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95297/testReport)** for PR 22042 at commit [`7a02921`](https://github.com/apache/spark/commit/7a02921950cda865e3cd45f1d1635212c2f707c0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94808/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r209479551
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala ---
    @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
         offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
       }
     
    -  override def count(): Long = offsetRanges.map(_.size).sum
    --- End diff --
    
    Goooood catch. That would have never occurred to me!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211803267
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -337,6 +338,7 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader(
           val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
           if (record != null) {
             nextRow = converter.toUnsafeRow(record)
    +        nextOffset = record.offset + 1
    --- End diff --
    
    why this change?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211805821
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
           }
         )
       }
    +
    +  test("read Kafka transactional messages: read_committed") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_committed")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        // Should not see any uncommitted messages
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages and abort the transaction. They should not be read.
    +          producer.beginTransaction()
    +          (6 to 10).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.abortTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages again. The consumer should skip the above aborted messages and read
    +          // them.
    +          producer.beginTransaction()
    +          (11 to 15).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 15): _*),  // offset: 15, 16, 17*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "16")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "17")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, "18")).get()
    +          producer.send(new ProducerRecord[String, String](topic, "19")).get()
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
    +      )
    +    } finally {
    +      producer.close()
    +    }
    +  }
    +
    +  test("read Kafka transactional messages: read_uncommitted") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_uncommitted")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    --- End diff --
    
    Why only 3 records when 1 to 5 has been sent already and we are reading uncommitted data?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211803763
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
           s"AddKafkaData(topics = $topics, data = $data, message = $message)"
       }
     
    +  object WithKafkaProducer {
    +    def apply(
    +        topic: String,
    +        producer: KafkaProducer[String, String])(
    +        func: KafkaProducer[String, String] => Unit): AssertOnQuery = {
    +      AssertOnQuery(_ => {
    --- End diff --
    
    nit: use Execute


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    **[Test build #95115 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95115/testReport)** for PR 22042 at commit [`603e0bc`](https://github.com/apache/spark/commit/603e0bc9cc822ec3151159a88a521ac063932f11).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r208676022
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala ---
    @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
         offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
       }
     
    -  override def count(): Long = offsetRanges.map(_.size).sum
    --- End diff --
    
    The assumption in these methods is no longer right, so remove them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94446/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211786163
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
     import org.apache.spark.sql.kafka010.KafkaSourceProvider._
     import org.apache.spark.util.UninterruptibleThread
     
    -/**
    - * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
    - * This means it's either a transaction (commit or abort) marker, or an aborted message if
    - * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
    - * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
    - */
    -private[kafka010] class MissingOffsetException(
    -    val offset: Long,
    -    val nextOffsetToFetch: Long) extends Exception(
    -  s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch")
    -
     private[kafka010] sealed trait KafkaDataConsumer {
       /**
    -   * Get the record for the given offset if available. Otherwise it will either throw error
    -   * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
    -   * or null.
    +   * Get the record for the given offset if available.
    +   *
    +   * If the record is invisible (either a
    +   * transaction message, or an aborted message when the consumer's `isolation.level` is
    +   * `read_committed`), it will be skipped and this method will try to fetch next available record
    +   * within [offset, untilOffset).
    +   *
    +   * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will
    +   * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this
    --- End diff --
    
    > Will we throw an exception even when its a control message and there is no real data loss?
    
    No. `It will be skipped and this method will try to fetch next available record within [offset, untilOffset).`



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2400/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211786183
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
    @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
     import org.apache.spark.sql.kafka010.KafkaSourceProvider._
     import org.apache.spark.util.UninterruptibleThread
     
    -/**
    - * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
    - * This means it's either a transaction (commit or abort) marker, or an aborted message if
    - * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
    - * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
    - */
    -private[kafka010] class MissingOffsetException(
    -    val offset: Long,
    -    val nextOffsetToFetch: Long) extends Exception(
    -  s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch")
    -
     private[kafka010] sealed trait KafkaDataConsumer {
       /**
    -   * Get the record for the given offset if available. Otherwise it will either throw error
    -   * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
    -   * or null.
    +   * Get the record for the given offset if available.
    +   *
    +   * If the record is invisible (either a
    +   * transaction message, or an aborted message when the consumer's `isolation.level` is
    +   * `read_committed`), it will be skipped and this method will try to fetch next available record
    +   * within [offset, untilOffset).
    +   *
    +   * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will
    --- End diff --
    
    Good catch


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22042
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org