You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2018/08/08 17:52:00 UTC
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
GitHub user zsxwing opened a pull request:
https://github.com/apache/spark/pull/22042
[SPARK-25005][SS]Support non-consecutive offsets for Kafka
## What changes were proposed in this pull request?
As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support:
- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch
They are all covered by the new unit tests.
## How was this patch tested?
The new unit tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zsxwing/spark kafka-transaction-read
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/22042.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #22042
----
commit dc18a6ff59fe7c48ed188a4eb9a6abf04caee0bd
Author: Shixiong Zhu <zs...@...>
Date: 2018-08-08T17:40:37Z
Support non-consecutive offsets for Kafka
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211801676
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
- failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ failOnDataLoss: Boolean): FetchedRecord = {
+ if (offset != fetchedData.nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
- seek(offset)
- poll(pollTimeoutMs)
- }
-
- if (!fetchedData.hasNext()) {
- // We cannot fetch anything after `poll`. Two possible cases:
- // - `offset` is out of range so that Kafka returns nothing. Just throw
- // `OffsetOutOfRangeException` to let the caller handle it.
- // - Cannot fetch any data before timeout. TimeoutException will be thrown.
- val range = getAvailableOffsetRange()
- if (offset < range.earliest || offset >= range.latest) {
- throw new OffsetOutOfRangeException(
- Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+ poll(offset, pollTimeoutMs)
--- End diff --
comment that this method updates `fetchedData`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2579/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2216/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r212033844
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -337,6 +338,7 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader(
val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
if (record != null) {
nextRow = converter.toUnsafeRow(record)
+ nextOffset = record.offset + 1
--- End diff --
We should update `nextOffset` to `record.offset + 1` rather that `nextOffset + 1`. Otherwise, it may return duplicated records when `failOnDataLoss` is `false`. I will submit another PR to push this fix to 2.3 as it's a correctness issue.
In addition, we should change `nextOffset` in the `next` method as the `get` method is designed to be called multiple times.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r212522664
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -161,6 +161,22 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
s"AddKafkaData(topics = $topics, data = $data, message = $message)"
}
+ object WithKafkaProducer {
--- End diff --
nit: This is not creating a KafkaProducer .. as most `With***` methods. The point of this is to force synchronization of the consumer. So maybe rename it to `WithOffsetSync { ... }`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211804454
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}
)
}
+
+ test("read Kafka transactional messages: read_committed") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_committed")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
--- End diff --
use `Execute`
and comment on what this does and why we need it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2546/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211805993
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---
@@ -327,6 +332,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props
}
+ def createProducer(usingTrascation: Boolean): KafkaProducer[String, String] = {
--- End diff --
nit: usingTrascation -> usingTra**n**scation
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94809/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211795985
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -80,6 +90,72 @@ private[kafka010] case class InternalKafkaConsumer(
kafkaParams: ju.Map[String, Object]) extends Logging {
import InternalKafkaConsumer._
+ /**
+ * The internal object to store the fetched data from Kafka consumer and the next offset to poll.
+ *
+ * @param records the pre-fetched Kafka records.
+ * @param nextOffsetInFetchedData the next offset in `records`. We use this to verify if we should
+ * check if the pre-fetched data is still valid.
+ * @param offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to poll
+ * when `records` is drained.
+ */
+ private case class FetchedData(
+ private var records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
+ var nextOffsetInFetchedData: Long,
--- End diff --
Make this public getter, private setter.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209476548
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ if (offset != nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
poll(pollTimeoutMs)
+ } else if (!fetchedData.hasNext) {
+ // The last pre-fetched data has been drained.
+ if (offset < offsetAfterPoll) {
+ // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
+ resetFetchedData()
+ throw new MissingOffsetException(offset, offsetAfterPoll)
+ } else {
+ seek(offset)
+ poll(pollTimeoutMs)
+ }
}
if (!fetchedData.hasNext()) {
- // We cannot fetch anything after `poll`. Two possible cases:
+ // We cannot fetch anything after `poll`. Three possible cases:
// - `offset` is out of range so that Kafka returns nothing. Just throw
// `OffsetOutOfRangeException` to let the caller handle it.
// - Cannot fetch any data before timeout. TimeoutException will be thrown.
+ // - Fetched something but all of them are not valid date messages. In this case, the position
--- End diff --
date => data
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95063 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95063/testReport)** for PR 22042 at commit [`a06742f`](https://github.com/apache/spark/commit/a06742fd3d19c3ee6d9c957b446bc5017be009bc).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r212507190
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
s"AddKafkaData(topics = $topics, data = $data, message = $message)"
}
+ object WithKafkaProducer {
+ def apply(
+ topic: String,
+ producer: KafkaProducer[String, String])(
--- End diff --
Ping on this comment. Maybe you missed this?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1958/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95056/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211804704
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
s"AddKafkaData(topics = $topics, data = $data, message = $message)"
}
+ object WithKafkaProducer {
+ def apply(
+ topic: String,
+ producer: KafkaProducer[String, String])(
--- End diff --
Why pass producer when all you are doing is to pass it to the function. The function can do it on its own.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95115 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95115/testReport)** for PR 22042 at commit [`603e0bc`](https://github.com/apache/spark/commit/603e0bc9cc822ec3151159a88a521ac063932f11).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211802112
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
- failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ failOnDataLoss: Boolean): FetchedRecord = {
+ if (offset != fetchedData.nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
- seek(offset)
- poll(pollTimeoutMs)
- }
-
- if (!fetchedData.hasNext()) {
- // We cannot fetch anything after `poll`. Two possible cases:
- // - `offset` is out of range so that Kafka returns nothing. Just throw
- // `OffsetOutOfRangeException` to let the caller handle it.
- // - Cannot fetch any data before timeout. TimeoutException will be thrown.
- val range = getAvailableOffsetRange()
- if (offset < range.earliest || offset >= range.latest) {
- throw new OffsetOutOfRangeException(
- Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+ poll(offset, pollTimeoutMs)
+ } else if (!fetchedData.hasNext) {
+ // The last pre-fetched data has been drained.
+ if (offset < fetchedData.offsetAfterPoll) {
+ // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
--- End diff --
"skip them" is confusing. What does it mean to skip? Why are we still returning something.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209476712
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -347,9 +391,12 @@ private[kafka010] case class InternalKafkaConsumer(
}
private def poll(pollTimeoutMs: Long): Unit = {
+ offsetBeforePoll = consumer.position(topicPartition)
--- End diff --
This variable `offsetBeforePoll` seems to be only used to identify whether data was actually fetched in a poll and nothing else. Rather than define another var (there are already many that already confusing), why not just return a boolean from poll which is true or false depending on whether poll moved anything.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211805733
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}
)
}
+
+ test("read Kafka transactional messages: read_committed") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_committed")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // Should not see any uncommitted messages
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages and abort the transaction. They should not be read.
+ producer.beginTransaction()
+ (6 to 10).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.abortTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages again. The consumer should skip the above aborted messages and read
+ // them.
+ producer.beginTransaction()
+ (11 to 15).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 15): _*), // offset: 15, 16, 17*
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "16")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "17")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "18")).get()
+ producer.send(new ProducerRecord[String, String](topic, "19")).get()
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
+ )
+ } finally {
+ producer.close()
+ }
+ }
+
+ test("read Kafka transactional messages: read_uncommitted") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_uncommitted")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being committed.
--- End diff --
Why so? This read_uncommitted, right?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209473432
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -31,6 +31,17 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
+/**
+ * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
+ * This means it's either a transaction (commit or abort) marker, or an aborted message if
+ * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
+ * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
+ */
+private[kafka010] class MissingOffsetException(
+ val offset: Long,
--- End diff --
maybe rename offset to something like missingOffset. Its weird to have a generic named field "offset" next to a specifically named field "nextOffsetToFetch".
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95297/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209474755
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
untilOffset: Long,
--- End diff --
Update docs of this method saying that it can throw MissingOffsetException and what it means?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95063/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211804879
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}
)
}
+
+ test("read Kafka transactional messages: read_committed") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_committed")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // Should not see any uncommitted messages
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
--- End diff --
Why is this `waitUntilBatchProcessed` needed? CheckAnswer waits for the batch to complete anyways.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #94809 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94809/testReport)** for PR 22042 at commit [`a9b00b4`](https://github.com/apache/spark/commit/a9b00b4a22f0b6b364cd1b35e2d99923d8b233dc).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #94441 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94441/testReport)** for PR 22042 at commit [`dc18a6f`](https://github.com/apache/spark/commit/dc18a6ff59fe7c48ed188a4eb9a6abf04caee0bd).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95297 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95297/testReport)** for PR 22042 at commit [`7a02921`](https://github.com/apache/spark/commit/7a02921950cda865e3cd45f1d1635212c2f707c0).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #94808 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94808/testReport)** for PR 22042 at commit [`baef29f`](https://github.com/apache/spark/commit/baef29f2983560c8010681c9bb7e74f711c8f2e7).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95236 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95236/testReport)** for PR 22042 at commit [`7a02921`](https://github.com/apache/spark/commit/7a02921950cda865e3cd45f1d1635212c2f707c0).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/22042
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209475048
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ if (offset != nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
poll(pollTimeoutMs)
+ } else if (!fetchedData.hasNext) {
+ // The last pre-fetched data has been drained.
+ if (offset < offsetAfterPoll) {
--- End diff --
Its hard to understand this condition because it hard to understand what offsetAfterPoll means? Does it refer to the offset that will be fetched next by the KafkaConsumer?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95319/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #94441 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94441/testReport)** for PR 22042 at commit [`dc18a6f`](https://github.com/apache/spark/commit/dc18a6ff59fe7c48ed188a4eb9a6abf04caee0bd).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95115/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95056 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95056/testReport)** for PR 22042 at commit [`f379d47`](https://github.com/apache/spark/commit/f379d47e30643fe92b751aa7aa374815ac66a55c).
* This patch **fails to build**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r212032759
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}
)
}
+
+ test("read Kafka transactional messages: read_committed") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_committed")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // Should not see any uncommitted messages
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages and abort the transaction. They should not be read.
+ producer.beginTransaction()
+ (6 to 10).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.abortTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages again. The consumer should skip the above aborted messages and read
+ // them.
+ producer.beginTransaction()
+ (11 to 15).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 15): _*), // offset: 15, 16, 17*
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "16")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "17")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "18")).get()
+ producer.send(new ProducerRecord[String, String](topic, "19")).get()
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
+ )
+ } finally {
+ producer.close()
+ }
+ }
+
+ test("read Kafka transactional messages: read_uncommitted") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_uncommitted")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
--- End diff --
> Why only 3 records when 1 to 5 has been sent already and we are reading uncommitted data?
I'm using `maxOffsetsPerTrigger = 3` to cut the batches on purpose. Otherwise, it's really hard to cover all of cases.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #94446 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94446/testReport)** for PR 22042 at commit [`dfea7e3`](https://github.com/apache/spark/commit/dfea7e363ef479c3783171bc3644be61d74beee7).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211801549
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
offset: Long,
--- End diff --
Maybe rename this method to fetchRecord, to make it consistent with return type.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/22042
cc @tdas
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r210423180
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -91,6 +90,17 @@ private[kafka010] case class InternalKafkaConsumer(
kafkaParams: ju.Map[String, Object]) extends Logging {
import InternalKafkaConsumer._
+ /**
+ * The internal object returned by the `fetchData` method. If `record` is empty, it means it is
+ * invisible (either a transaction message, or an aborted message when the consumer's
+ * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
+ * instead.
+ */
+ private case class FetchedRecord(
+ record: Option[ConsumerRecord[Array[Byte], Array[Byte]]],
--- End diff --
Can;t we reuse the objects here. And do we need to have an Option, thus creating a lot of Option objects all the time?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #94809 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94809/testReport)** for PR 22042 at commit [`a9b00b4`](https://github.com/apache/spark/commit/a9b00b4a22f0b6b364cd1b35e2d99923d8b233dc).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211786471
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -250,33 +294,42 @@ private[kafka010] case class InternalKafkaConsumer(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
- failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ failOnDataLoss: Boolean): FetchedRecord = {
+ if (offset != nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
- seek(offset)
- poll(pollTimeoutMs)
+ poll(offset, pollTimeoutMs)
+ } else if (!fetchedData.hasNext) {
+ // The last pre-fetched data has been drained.
+ if (offset < offsetAfterPoll) {
--- End diff --
this is the place preventing me from making `offsetAfterPoll` be a local var.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #94808 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94808/testReport)** for PR 22042 at commit [`baef29f`](https://github.com/apache/spark/commit/baef29f2983560c8010681c9bb7e74f711c8f2e7).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94441/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209477156
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ if (offset != nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
poll(pollTimeoutMs)
+ } else if (!fetchedData.hasNext) {
+ // The last pre-fetched data has been drained.
+ if (offset < offsetAfterPoll) {
+ // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
+ resetFetchedData()
+ throw new MissingOffsetException(offset, offsetAfterPoll)
+ } else {
+ seek(offset)
+ poll(pollTimeoutMs)
+ }
}
if (!fetchedData.hasNext()) {
- // We cannot fetch anything after `poll`. Two possible cases:
+ // We cannot fetch anything after `poll`. Three possible cases:
// - `offset` is out of range so that Kafka returns nothing. Just throw
// `OffsetOutOfRangeException` to let the caller handle it.
// - Cannot fetch any data before timeout. TimeoutException will be thrown.
+ // - Fetched something but all of them are not valid date messages. In this case, the position
+ // will be changed and we can use it to determine this case.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
- } else {
+ } else if (offsetBeforePoll == offsetAfterPoll) {
--- End diff --
Just to be clear, can this happen only if there is a timeout?? And if so then why push this condition and exception into the poll() method thus simplifying this method?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95236/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95063 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95063/testReport)** for PR 22042 at commit [`a06742f`](https://github.com/apache/spark/commit/a06742fd3d19c3ee6d9c957b446bc5017be009bc).
* This patch **fails to build**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2454/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r212521083
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -239,56 +335,74 @@ private[kafka010] case class InternalKafkaConsumer(
}
/**
- * Get the record for the given offset if available. Otherwise it will either throw error
- * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
- * or null.
+ * Get the fetched record for the given offset if available.
+ *
+ * If the record is invisible (either a transaction message, or an aborted message when the
+ * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the
+ * next offset to fetch.
+ *
+ * This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will
+ * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this
+ * method will return `null` if the next available record is within [offset, untilOffset).
*
* @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
*/
- private def fetchData(
+ private def fetchRecord(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
- failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
- // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
- seek(offset)
- poll(pollTimeoutMs)
- }
-
- if (!fetchedData.hasNext()) {
- // We cannot fetch anything after `poll`. Two possible cases:
- // - `offset` is out of range so that Kafka returns nothing. Just throw
- // `OffsetOutOfRangeException` to let the caller handle it.
- // - Cannot fetch any data before timeout. TimeoutException will be thrown.
- val range = getAvailableOffsetRange()
- if (offset < range.earliest || offset >= range.latest) {
- throw new OffsetOutOfRangeException(
- Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+ failOnDataLoss: Boolean): FetchedRecord = {
+ if (offset != fetchedData.nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
+ // Fetch records from Kafka and update `fetchedData`.
+ fetchData(offset, pollTimeoutMs)
+ } else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained.
+ if (offset < fetchedData.offsetAfterPoll) {
+ // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask
+ // the next call to start from `fetchedData.offsetAfterPoll`.
+ fetchedData.reset()
+ return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
- throw new TimeoutException(
- s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
+ // Fetch records from Kafka and update `fetchedData`.
+ fetchData(offset, pollTimeoutMs)
}
+ }
+
+ if (!fetchedData.hasNext) {
+ // When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still
+ // empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a
+ // record to ask the next call to start from `fetchedData.offsetAfterPoll`.
+ assert(offset <= fetchedData.offsetAfterPoll,
+ s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}")
+ fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
val record = fetchedData.next()
- nextOffsetInFetchedData = record.offset + 1
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next
// available offset. Hence we need to handle offset mismatch.
if (record.offset > offset) {
+ val range = getAvailableOffsetRange()
+ if (range.earliest <= offset) {
+ // `offset` is still valid but the corresponding message is invisible. We should skip it
+ // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
+ // `fetchRecord` can just return `record` directly.
+ fetchedData.previous()
+ return fetchedRecord.withRecord(null, record.offset)
+ }
// This may happen when some records aged out but their offsets already got verified
if (failOnDataLoss) {
reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
// Never happen as "reportDataLoss" will throw an exception
- null
+ throw new IllegalStateException(
+ "reportDataLoss didn't throw an exception when 'failOnDataLoss' is true")
} else {
if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
- null
+ // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
+ fetchedRecord.withRecord(null, untilOffset)
} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
- record
+ fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
--- End diff --
nit: This can be unnested.
if ... else { if ... else ... } -> if ... else if .. else
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r210985375
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -346,11 +385,40 @@ private[kafka010] case class InternalKafkaConsumer(
consumer.seek(topicPartition, offset)
}
- private def poll(pollTimeoutMs: Long): Unit = {
+ /**
+ * Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`.
+ * `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible
+ * messages (either transaction messages, or aborted messages when `isolation.level` is
+ * `read_committed`).
+ *
+ * @throws OffsetOutOfRangeException if `offset` is out of range.
+ * @throws TimeoutException if the consumer position is not changed after polling. It means the
+ * consumer polls nothing before timeout.
+ */
+ private def poll(offset: Long, pollTimeoutMs: Long): Unit = {
+ seek(offset)
val p = consumer.poll(pollTimeoutMs)
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
- fetchedData = r.iterator
+ offsetAfterPoll = consumer.position(topicPartition)
--- End diff --
I strongly think that this should not be a var, rather a clear return value. we have been burnt by too many mutable vars/defs (see all the flakiness caused by the structured ProgressReporter) and we should consciously try to improve this everywhere by not having vars all over the place.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/22042
Thanks! Merging to master.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211805409
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}
)
}
+
+ test("read Kafka transactional messages: read_committed") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_committed")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // Should not see any uncommitted messages
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages and abort the transaction. They should not be read.
+ producer.beginTransaction()
+ (6 to 10).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.abortTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages again. The consumer should skip the above aborted messages and read
+ // them.
+ producer.beginTransaction()
+ (11 to 15).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 15): _*), // offset: 15, 16, 17*
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "16")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "17")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "18")).get()
+ producer.send(new ProducerRecord[String, String](topic, "19")).get()
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
+ )
+ } finally {
+ producer.close()
+ }
+ }
+
+ test("read Kafka transactional messages: read_uncommitted") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_uncommitted")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
--- End diff --
You could define a testWithProducer method and wrap the finally in it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211802489
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -288,7 +385,7 @@ private[kafka010] case class InternalKafkaConsumer(
null
--- End diff --
We should not be returning null EVER when we are using `FetchedRecord.record = null` to signify lack of record.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211801968
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
- failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ failOnDataLoss: Boolean): FetchedRecord = {
+ if (offset != fetchedData.nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
- seek(offset)
- poll(pollTimeoutMs)
- }
-
- if (!fetchedData.hasNext()) {
- // We cannot fetch anything after `poll`. Two possible cases:
- // - `offset` is out of range so that Kafka returns nothing. Just throw
- // `OffsetOutOfRangeException` to let the caller handle it.
- // - Cannot fetch any data before timeout. TimeoutException will be thrown.
- val range = getAvailableOffsetRange()
- if (offset < range.earliest || offset >= range.latest) {
- throw new OffsetOutOfRangeException(
- Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+ poll(offset, pollTimeoutMs)
+ } else if (!fetchedData.hasNext) {
+ // The last pre-fetched data has been drained.
--- End diff --
nit: I was confused with whether the above comment was for the `else if` above it or for the `if` below it. Maybe inline it with the `else if`. Or leave a line after it, before the `if` below.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209479417
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ if (offset != nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
poll(pollTimeoutMs)
+ } else if (!fetchedData.hasNext) {
+ // The last pre-fetched data has been drained.
+ if (offset < offsetAfterPoll) {
+ // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
+ resetFetchedData()
+ throw new MissingOffsetException(offset, offsetAfterPoll)
--- End diff --
So MissingOffsetRange is only used to signal that some offset may be missing due to control messages and nothing else. And the higher function (i.e. `get`) just handles it by resetting the fetched offsets. Why not let this `fetchData` method handle the situation instead of creating a new exception just for this?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2404/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209478033
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ if (offset != nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
poll(pollTimeoutMs)
+ } else if (!fetchedData.hasNext) {
+ // The last pre-fetched data has been drained.
+ if (offset < offsetAfterPoll) {
+ // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
+ resetFetchedData()
+ throw new MissingOffsetException(offset, offsetAfterPoll)
+ } else {
+ seek(offset)
+ poll(pollTimeoutMs)
+ }
}
if (!fetchedData.hasNext()) {
- // We cannot fetch anything after `poll`. Two possible cases:
+ // We cannot fetch anything after `poll`. Three possible cases:
// - `offset` is out of range so that Kafka returns nothing. Just throw
// `OffsetOutOfRangeException` to let the caller handle it.
// - Cannot fetch any data before timeout. TimeoutException will be thrown.
+ // - Fetched something but all of them are not valid date messages. In this case, the position
+ // will be changed and we can use it to determine this case.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
- } else {
+ } else if (offsetBeforePoll == offsetAfterPoll) {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
+ } else {
+ assert(offset <= offsetAfterPoll,
+ s"seek to $offset and poll but the offset was reset to $offsetAfterPoll")
+ throw new MissingOffsetException(offset, offsetAfterPoll)
}
} else {
--- End diff --
Let's remove this else and reduce the condition nesting. The previous `if` statement always ends in an exception, so we can remove this else.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209473316
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -95,6 +106,10 @@ private[kafka010] case class InternalKafkaConsumer(
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
@volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET
+ @volatile private var offsetBeforePoll: Long = UNKNOWN_OFFSET
--- End diff --
Can you add some docs to explain what these 2 vars siginify and why these vars are needed?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95319 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95319/testReport)** for PR 22042 at commit [`ea804cf`](https://github.com/apache/spark/commit/ea804cfe840196519cc9444be9bedf03d10aa11a).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2596/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211801254
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
- failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been drained.
+ failOnDataLoss: Boolean): FetchedRecord = {
+ if (offset != fetchedData.nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
- seek(offset)
- poll(pollTimeoutMs)
- }
-
- if (!fetchedData.hasNext()) {
- // We cannot fetch anything after `poll`. Two possible cases:
- // - `offset` is out of range so that Kafka returns nothing. Just throw
- // `OffsetOutOfRangeException` to let the caller handle it.
- // - Cannot fetch any data before timeout. TimeoutException will be thrown.
- val range = getAvailableOffsetRange()
- if (offset < range.earliest || offset >= range.latest) {
- throw new OffsetOutOfRangeException(
- Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+ poll(offset, pollTimeoutMs)
+ } else if (!fetchedData.hasNext) {
+ // The last pre-fetched data has been drained.
+ if (offset < fetchedData.offsetAfterPoll) {
+ // Offsets in [offset, offsetAfterPoll) are missing. We should skip them.
+ fetchedData.reset()
+ return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
- throw new TimeoutException(
- s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds")
+ poll(offset, pollTimeoutMs)
}
+ }
+
+ if (!fetchedData.hasNext) {
+ assert(offset <= fetchedData.offsetAfterPoll,
--- End diff --
Add comments here on what this case means.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r212504622
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala ---
@@ -331,6 +331,7 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
private val rangeToRead = resolveRange(offsetRange)
+
--- End diff --
unnecessary
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2215/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r210422521
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
-/**
- * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
- * This means it's either a transaction (commit or abort) marker, or an aborted message if
- * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
- * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
- */
-private[kafka010] class MissingOffsetException(
- val offset: Long,
- val nextOffsetToFetch: Long) extends Exception(
- s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch")
-
private[kafka010] sealed trait KafkaDataConsumer {
/**
- * Get the record for the given offset if available. Otherwise it will either throw error
- * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
- * or null.
+ * Get the record for the given offset if available.
+ *
+ * If the record is invisible (either a
+ * transaction message, or an aborted message when the consumer's `isolation.level` is
+ * `read_committed`), it will be skipped and this method will try to fetch next available record
+ * within [offset, untilOffset).
+ *
+ * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will
--- End diff --
if failOnDataLoss is *true* then it should throw exception... isnt it?
nit: try its best
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #94446 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94446/testReport)** for PR 22042 at commit [`dfea7e3`](https://github.com/apache/spark/commit/dfea7e363ef479c3783171bc3644be61d74beee7).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r212522432
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -161,6 +161,22 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
s"AddKafkaData(topics = $topics, data = $data, message = $message)"
}
+ object WithKafkaProducer {
+ def apply(
+ topic: String,
+ producer: KafkaProducer[String, String])(
+ func: KafkaProducer[String, String] => Unit): AssertOnQuery = {
--- End diff --
nit: AssertOnQuery -> StreamAction
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/22042
> This patch fails Spark unit tests.
This is the flaky test I fixed in #22230
retest this please
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:
https://github.com/apache/spark/pull/22042
LGTM.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209473392
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -31,6 +31,17 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
+/**
+ * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
+ * This means it's either a transaction (commit or abort) marker, or an aborted message if
+ * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
+ * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
+ */
+private[kafka010] class MissingOffsetException(
--- End diff --
nit: Is this meant to be used outside this KafkaDataConsumer class? If not, then maybe make it an inner class to KafkaDataConsumer.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95319 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95319/testReport)** for PR 22042 at commit [`ea804cf`](https://github.com/apache/spark/commit/ea804cfe840196519cc9444be9bedf03d10aa11a).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211801632
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -346,11 +437,40 @@ private[kafka010] case class InternalKafkaConsumer(
consumer.seek(topicPartition, offset)
}
- private def poll(pollTimeoutMs: Long): Unit = {
+ /**
+ * Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`.
+ * `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible
+ * messages (either transaction messages, or aborted messages when `isolation.level` is
+ * `read_committed`).
+ *
+ * @throws OffsetOutOfRangeException if `offset` is out of range.
+ * @throws TimeoutException if the consumer position is not changed after polling. It means the
+ * consumer polls nothing before timeout.
+ */
+ private def poll(offset: Long, pollTimeoutMs: Long): Unit = {
--- End diff --
Maybe rename this method to be consistent with that it does .... fetch data.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95236 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95236/testReport)** for PR 22042 at commit [`7a02921`](https://github.com/apache/spark/commit/7a02921950cda865e3cd45f1d1635212c2f707c0).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95056 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95056/testReport)** for PR 22042 at commit [`f379d47`](https://github.com/apache/spark/commit/f379d47e30643fe92b751aa7aa374815ac66a55c).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211805275
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}
)
}
+
+ test("read Kafka transactional messages: read_committed") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_committed")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // Should not see any uncommitted messages
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages and abort the transaction. They should not be read.
+ producer.beginTransaction()
+ (6 to 10).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.abortTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages again. The consumer should skip the above aborted messages and read
+ // them.
+ producer.beginTransaction()
+ (11 to 15).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 15): _*), // offset: 15, 16, 17*
--- End diff --
Use CheckNewAnswer instead cumulative CheckAnswer.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1961/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r210422755
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
-/**
- * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
- * This means it's either a transaction (commit or abort) marker, or an aborted message if
- * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
- * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
- */
-private[kafka010] class MissingOffsetException(
- val offset: Long,
- val nextOffsetToFetch: Long) extends Exception(
- s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch")
-
private[kafka010] sealed trait KafkaDataConsumer {
/**
- * Get the record for the given offset if available. Otherwise it will either throw error
- * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
- * or null.
+ * Get the record for the given offset if available.
+ *
+ * If the record is invisible (either a
+ * transaction message, or an aborted message when the consumer's `isolation.level` is
+ * `read_committed`), it will be skipped and this method will try to fetch next available record
+ * within [offset, untilOffset).
+ *
+ * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will
+ * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this
--- End diff --
Will we throw an exception even when its a control message and there is no real data loss?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/22042
retest this please
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95297 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95297/testReport)** for PR 22042 at commit [`7a02921`](https://github.com/apache/spark/commit/7a02921950cda865e3cd45f1d1635212c2f707c0).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94808/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r209479551
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala ---
@@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
}
- override def count(): Long = offsetRanges.map(_.size).sum
--- End diff --
Goooood catch. That would have never occurred to me!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211803267
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -337,6 +338,7 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader(
val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
if (record != null) {
nextRow = converter.toUnsafeRow(record)
+ nextOffset = record.offset + 1
--- End diff --
why this change?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211805821
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}
)
}
+
+ test("read Kafka transactional messages: read_committed") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_committed")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // Should not see any uncommitted messages
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message]
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages and abort the transaction. They should not be read.
+ producer.beginTransaction()
+ (6 to 10).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.abortTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages again. The consumer should skip the above aborted messages and read
+ // them.
+ producer.beginTransaction()
+ (11 to 15).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 15): _*), // offset: 15, 16, 17*
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "16")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "17")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic, "18")).get()
+ producer.send(new ProducerRecord[String, String](topic, "19")).get()
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
+ )
+ } finally {
+ producer.close()
+ }
+ }
+
+ test("read Kafka transactional messages: read_uncommitted") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_uncommitted")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
--- End diff --
Why only 3 records when 1 to 5 has been sent already and we are reading uncommitted data?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211803763
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
s"AddKafkaData(topics = $topics, data = $data, message = $message)"
}
+ object WithKafkaProducer {
+ def apply(
+ topic: String,
+ producer: KafkaProducer[String, String])(
+ func: KafkaProducer[String, String] => Unit): AssertOnQuery = {
+ AssertOnQuery(_ => {
--- End diff --
nit: use Execute
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22042
**[Test build #95115 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95115/testReport)** for PR 22042 at commit [`603e0bc`](https://github.com/apache/spark/commit/603e0bc9cc822ec3151159a88a521ac063932f11).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r208676022
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala ---
@@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
}
- override def count(): Long = offsetRanges.map(_.size).sum
--- End diff --
The assumption in these methods is no longer right, so remove them.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94446/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211786163
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
-/**
- * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
- * This means it's either a transaction (commit or abort) marker, or an aborted message if
- * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
- * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
- */
-private[kafka010] class MissingOffsetException(
- val offset: Long,
- val nextOffsetToFetch: Long) extends Exception(
- s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch")
-
private[kafka010] sealed trait KafkaDataConsumer {
/**
- * Get the record for the given offset if available. Otherwise it will either throw error
- * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
- * or null.
+ * Get the record for the given offset if available.
+ *
+ * If the record is invisible (either a
+ * transaction message, or an aborted message when the consumer's `isolation.level` is
+ * `read_committed`), it will be skipped and this method will try to fetch next available record
+ * within [offset, untilOffset).
+ *
+ * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will
+ * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this
--- End diff --
> Will we throw an exception even when its a control message and there is no real data loss?
No. `It will be skipped and this method will try to fetch next available record within [offset, untilOffset).`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2400/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211786183
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---
@@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
-/**
- * An exception to indicate there is a missing offset in the records returned by Kafka consumer.
- * This means it's either a transaction (commit or abort) marker, or an aborted message if
- * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are
- * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch.
- */
-private[kafka010] class MissingOffsetException(
- val offset: Long,
- val nextOffsetToFetch: Long) extends Exception(
- s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch")
-
private[kafka010] sealed trait KafkaDataConsumer {
/**
- * Get the record for the given offset if available. Otherwise it will either throw error
- * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
- * or null.
+ * Get the record for the given offset if available.
+ *
+ * If the record is invisible (either a
+ * transaction message, or an aborted message when the consumer's `isolation.level` is
+ * `read_committed`), it will be skipped and this method will try to fetch next available record
+ * within [offset, untilOffset).
+ *
+ * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will
--- End diff --
Good catch
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22042
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org