You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2018/02/09 01:54:06 UTC
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
GitHub user tdas opened a pull request:
https://github.com/apache/spark/pull/20554
[SPARK-23362][SS] Migrate Kafka Microbatch source to v2
## What changes were proposed in this pull request?
Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with data source v2).
## How was this patch tested?
Existing tests, few modified to be better tests than the existing ones.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tdas/spark SPARK-23362
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20554.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20554
----
commit 3ed2a509276194214875f39e1e18d8093155c54c
Author: Tathagata Das <ta...@...>
Date: 2018-02-09T01:46:56Z
Migrated
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r168120568
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: PartitionOffsetMap,
+ until: PartitionOffsetMap): PartitionOffsetMap = {
+ val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
+ }
+
+ private def getSortedExecutorList(): Array[String] = {
+
+ def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
+ }
+
+ val bm = SparkEnv.get.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compare)
+ .map(_.toString)
+ }
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ } else {
+ logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ }
+ }
+
+ /** A version of [[HDFSMetadataLog]] specialized for saving */
+ class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+ extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+ val VERSION = 1
+
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+ // HDFSMetadataLog guarantees that it never creates a partial file.
+ assert(content.length != 0)
+ if (content(0) == 'v') {
+ val indexOfNewLine = content.indexOf("\n")
+ if (indexOfNewLine > 0) {
+ val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+ KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+ } else {
+ throw new IllegalStateException(
+ s"Log file was malformed: failed to detect the log file version line.")
+ }
+ } else {
+ // The log was generated by Spark 2.1.0
+ KafkaSourceOffset(SerializedOffset(content))
+ }
+ }
+ }
+}
+
+/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReaderFactory(
+ range: KafkaOffsetRange,
+ preferredLoc: Option[String],
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
+
+ override def preferredLocations(): Array[String] = preferredLoc.toArray
+
+ override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
+ range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+}
+
+/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReader(
+ offsetRange: KafkaOffsetRange,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
+
+ private val consumer = CachedKafkaConsumer.getOrCreate(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ private val rangeToRead = resolveRange(offsetRange)
+ private val timestamp = new java.sql.Timestamp(0)
+
+ private var nextOffset = rangeToRead.fromOffset
+ private var nextRow: Row = _
+
+ override def next(): Boolean = {
+ if (nextOffset < rangeToRead.untilOffset) {
+ val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
+ if (cr != null) {
+ timestamp.setTime(cr.timestamp)
+ nextRow = Row(
--- End diff --
I checked the code, I dont think there should be a performance regression. Here is how the data was processed.
- Old source: Wrapped objects into GenericInternalRow and then LogicalRDD/RDDScanExec essentially used a UnsafeProjection to write them into an UnsafeRow before returning as an InternalRow.
- New BatchReader: Wraps objects into GenericRow and then DataSourceV2Relation/DataSourceV2ScanExec/DatasourceRDD essentially used a RowEncoder to write them into an UnsafeRow before returning as an InternalRow.
I am running tests to measure the throughput of the two sources. Since we are not removing the V1 source we can always fallback to V1, and fix V2 later.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/906/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87497/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167123837
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
--- End diff --
Same as KafkaSource.rateLimit.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87238 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87238/testReport)** for PR 20554 at commit [`3ed2a50`](https://github.com/apache/spark/commit/3ed2a509276194214875f39e1e18d8093155c54c).
* This patch passes all tests.
* This patch **does not merge cleanly**.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87238/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/732/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167815276
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: PartitionOffsetMap,
+ until: PartitionOffsetMap): PartitionOffsetMap = {
+ val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
+ }
+
+ private def getSortedExecutorList(): Array[String] = {
+
+ def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
+ }
+
+ val bm = SparkEnv.get.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compare)
+ .map(_.toString)
+ }
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ } else {
+ logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ }
+ }
+
+ /** A version of [[HDFSMetadataLog]] specialized for saving */
+ class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+ extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+ val VERSION = 1
+
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+ // HDFSMetadataLog guarantees that it never creates a partial file.
+ assert(content.length != 0)
+ if (content(0) == 'v') {
+ val indexOfNewLine = content.indexOf("\n")
+ if (indexOfNewLine > 0) {
+ val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+ KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+ } else {
+ throw new IllegalStateException(
+ s"Log file was malformed: failed to detect the log file version line.")
+ }
+ } else {
+ // The log was generated by Spark 2.1.0
+ KafkaSourceOffset(SerializedOffset(content))
+ }
+ }
+ }
+}
+
+/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReaderFactory(
+ range: KafkaOffsetRange,
+ preferredLoc: Option[String],
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
+
+ override def preferredLocations(): Array[String] = preferredLoc.toArray
+
+ override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
+ range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+}
+
+/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReader(
+ offsetRange: KafkaOffsetRange,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
+
+ private val consumer = CachedKafkaConsumer.getOrCreate(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ private val rangeToRead = resolveRange(offsetRange)
+ private val timestamp = new java.sql.Timestamp(0)
+
+ private var nextOffset = rangeToRead.fromOffset
+ private var nextRow: Row = _
+
+ override def next(): Boolean = {
+ if (nextOffset < rangeToRead.untilOffset) {
+ val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
+ if (cr != null) {
+ timestamp.setTime(cr.timestamp)
+ nextRow = Row(
--- End diff --
Spark needs to convert Row to InternalRow. Right? Do we have an API to avoid this performance regression?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87446/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/892/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/907/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87446 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87446/testReport)** for PR 20554 at commit [`55c057d`](https://github.com/apache/spark/commit/55c057d1291c927c05192e9b2ce95388fe814c1a).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87465/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87444 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87444/testReport)** for PR 20554 at commit [`677f897`](https://github.com/apache/spark/commit/677f89764dd04d6c0088af3bf39d5a049777d079).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r168558972
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -112,14 +112,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
query.nonEmpty,
"Cannot add data when there is no query for finding the active kafka source")
- val sources = query.get.logicalPlan.collect {
- case StreamingExecutionRelation(source: KafkaSource, _) => source
- } ++ (query.get.lastExecution match {
- case null => Seq()
- case e => e.logical.collect {
- case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
- }
- })
+ val sources = {
+ query.get.logicalPlan.collect {
+ case StreamingExecutionRelation(source: KafkaSource, _) => source
+ case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source
+ } ++ (query.get.lastExecution match {
+ case null => Seq()
+ case e => e.logical.collect {
+ case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
+ }
+ })
+ }.distinct
--- End diff --
Is the distinct for the self join test?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87444/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87433/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87436/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r168626487
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
@@ -112,14 +112,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
query.nonEmpty,
"Cannot add data when there is no query for finding the active kafka source")
- val sources = query.get.logicalPlan.collect {
- case StreamingExecutionRelation(source: KafkaSource, _) => source
- } ++ (query.get.lastExecution match {
- case null => Seq()
- case e => e.logical.collect {
- case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
- }
- })
+ val sources = {
+ query.get.logicalPlan.collect {
+ case StreamingExecutionRelation(source: KafkaSource, _) => source
+ case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source
+ } ++ (query.get.lastExecution match {
+ case null => Seq()
+ case e => e.logical.collect {
+ case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
+ }
+ })
+ }.distinct
--- End diff --
yes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87436 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87436/testReport)** for PR 20554 at commit [`f9983d9`](https://github.com/apache/spark/commit/f9983d937b97b2ba9f020f370b8aefdd353a654b).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167124346
--- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin ---
@@ -0,0 +1,2 @@
+0v99999
+{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
--- End diff --
note: should remove the newline to keep it consistent
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87446 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87446/testReport)** for PR 20554 at commit [`55c057d`](https://github.com/apache/spark/commit/55c057d1291c927c05192e9b2ce95388fe814c1a).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167124768
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
)
}
- testWithUninterruptibleThread(
- "deserialization of initial offset with Spark 2.1.0") {
- withTempDir { metadataPath =>
- val topic = newTopic
- testUtils.createTopic(topic, partitions = 3)
-
- val provider = new KafkaSourceProvider
- val parameters = Map(
- "kafka.bootstrap.servers" -> testUtils.brokerAddress,
- "subscribe" -> topic
- )
- val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
- "", parameters)
- source.getOffset.get // Write initial offset
-
- // Make sure Spark 2.1.0 will throw an exception when reading the new log
- intercept[java.lang.IllegalArgumentException] {
- // Simulate how Spark 2.1.0 reads the log
- Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
- val length = in.read()
- val bytes = new Array[Byte](length)
- in.read(bytes)
- KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
- }
- }
- }
- }
-
- testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") {
+ test("deserialization of initial offset written by Spark 2.1.0") {
withTempDir { metadataPath =>
--- End diff --
Changed the two tests below to not use the source/reader directly (too low-level implementation dependent test) to actually run a streaming query using sample initial offset files in the `test/resources`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167123513
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: PartitionOffsetMap,
+ until: PartitionOffsetMap): PartitionOffsetMap = {
+ val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
+ }
+
+ private def getSortedExecutorList(): Array[String] = {
+
+ def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
+ }
+
+ val bm = SparkEnv.get.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compare)
+ .map(_.toString)
+ }
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ } else {
+ logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ }
+ }
+
+ /** A version of [[HDFSMetadataLog]] specialized for saving */
+ class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+ extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+ val VERSION = 1
+
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+ // HDFSMetadataLog guarantees that it never creates a partial file.
+ assert(content.length != 0)
+ if (content(0) == 'v') {
+ val indexOfNewLine = content.indexOf("\n")
+ if (indexOfNewLine > 0) {
+ val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+ KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+ } else {
+ throw new IllegalStateException(
+ s"Log file was malformed: failed to detect the log file version line.")
+ }
+ } else {
+ // The log was generated by Spark 2.1.0
+ KafkaSourceOffset(SerializedOffset(content))
+ }
+ }
+ }
+}
+
+/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReaderFactory(
+ range: KafkaOffsetRange,
+ preferredLoc: Option[String],
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
+
+ override def preferredLocations(): Array[String] = preferredLoc.toArray
+
+ override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
+ range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+}
+
+/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReader(
+ offsetRange: KafkaOffsetRange,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
+
+ private val consumer = CachedKafkaConsumer.getOrCreate(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ private val rangeToRead = resolveRange(offsetRange)
+ private val timestamp = new java.sql.Timestamp(0)
+
+ private var nextOffset = rangeToRead.fromOffset
+ private var nextRow: Row = _
+
+ override def next(): Boolean = {
--- End diff --
This code has been copied from the `KafkaSourceRDD`'s iterator code.
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala#L148
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/891/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167127585
--- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin ---
@@ -1 +1 @@
-2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
\ No newline at end of file
+2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
--- End diff --
Why does this need to be modified? The point of this file IIUC is to ensure that compatibility is maintained with offsets logged in old versions, so I worry something's wrong if we need to update it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:
https://github.com/apache/spark/pull/20554
retest this please
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87498 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87498/testReport)** for PR 20554 at commit [`2dea08a`](https://github.com/apache/spark/commit/2dea08a4c5f85991e4ad4c7da886c2e0bf456bb8).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/20554
LGTM
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167124564
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
)
}
- testWithUninterruptibleThread(
--- End diff --
I think this test is superfluous and does not test anything useful. As with the other modified tests, "simulating" an implementation is a BAD test, and in this particular case it is attempting to simulate the 2.1.0 log, which is not necessary any more.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87497 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87497/testReport)** for PR 20554 at commit [`7f5df22`](https://github.com/apache/spark/commit/7f5df222da2e6cf59ed632b1c05165f1035202f3).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r168559060
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
)
}
- testWithUninterruptibleThread(
--- End diff --
+1
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87436 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87436/testReport)** for PR 20554 at commit [`f9983d9`](https://github.com/apache/spark/commit/f9983d937b97b2ba9f020f370b8aefdd353a654b).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r168591005
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: PartitionOffsetMap,
+ until: PartitionOffsetMap): PartitionOffsetMap = {
+ val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
+ }
+
+ private def getSortedExecutorList(): Array[String] = {
+
+ def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
+ }
+
+ val bm = SparkEnv.get.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compare)
+ .map(_.toString)
+ }
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ } else {
+ logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ }
+ }
+
+ /** A version of [[HDFSMetadataLog]] specialized for saving */
+ class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+ extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+ val VERSION = 1
+
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+ // HDFSMetadataLog guarantees that it never creates a partial file.
+ assert(content.length != 0)
+ if (content(0) == 'v') {
+ val indexOfNewLine = content.indexOf("\n")
+ if (indexOfNewLine > 0) {
+ val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+ KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+ } else {
+ throw new IllegalStateException(
+ s"Log file was malformed: failed to detect the log file version line.")
+ }
+ } else {
+ // The log was generated by Spark 2.1.0
+ KafkaSourceOffset(SerializedOffset(content))
+ }
+ }
+ }
+}
+
+/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReaderFactory(
+ range: KafkaOffsetRange,
+ preferredLoc: Option[String],
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
+
+ override def preferredLocations(): Array[String] = preferredLoc.toArray
+
+ override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
+ range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+}
+
+/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReader(
+ offsetRange: KafkaOffsetRange,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
+
+ private val consumer = CachedKafkaConsumer.getOrCreate(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ private val rangeToRead = resolveRange(offsetRange)
+ private val timestamp = new java.sql.Timestamp(0)
+
+ private var nextOffset = rangeToRead.fromOffset
+ private var nextRow: Row = _
+
+ override def next(): Boolean = {
+ if (nextOffset < rangeToRead.untilOffset) {
+ val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
+ if (cr != null) {
+ timestamp.setTime(cr.timestamp)
+ nextRow = Row(
--- End diff --
I don't necessarily insist on changing it in this PR, but the V2 API does support creating UnsafeRow directly. See the row creation logic in KafkaContinuousDataReader.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167123614
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: PartitionOffsetMap,
+ until: PartitionOffsetMap): PartitionOffsetMap = {
+ val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
+ }
+
+ private def getSortedExecutorList(): Array[String] = {
+
+ def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
+ }
+
+ val bm = SparkEnv.get.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compare)
+ .map(_.toString)
+ }
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ } else {
+ logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ }
+ }
+
+ /** A version of [[HDFSMetadataLog]] specialized for saving */
+ class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+ extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+ val VERSION = 1
+
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+ // HDFSMetadataLog guarantees that it never creates a partial file.
+ assert(content.length != 0)
+ if (content(0) == 'v') {
+ val indexOfNewLine = content.indexOf("\n")
+ if (indexOfNewLine > 0) {
+ val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+ KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+ } else {
+ throw new IllegalStateException(
+ s"Log file was malformed: failed to detect the log file version line.")
+ }
+ } else {
+ // The log was generated by Spark 2.1.0
+ KafkaSourceOffset(SerializedOffset(content))
+ }
+ }
+ }
+}
+
+/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReaderFactory(
+ range: KafkaOffsetRange,
+ preferredLoc: Option[String],
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
+
+ override def preferredLocations(): Array[String] = preferredLoc.toArray
+
+ override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
+ range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+}
+
+/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReader(
+ offsetRange: KafkaOffsetRange,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
+
+ private val consumer = CachedKafkaConsumer.getOrCreate(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ private val rangeToRead = resolveRange(offsetRange)
+ private val timestamp = new java.sql.Timestamp(0)
+
+ private var nextOffset = rangeToRead.fromOffset
+ private var nextRow: Row = _
+
+ override def next(): Boolean = {
+ if (nextOffset < rangeToRead.untilOffset) {
+ val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
+ if (cr != null) {
+ timestamp.setTime(cr.timestamp)
+ nextRow = Row(
+ cr.key,
+ cr.value,
+ cr.topic,
+ cr.partition,
+ cr.offset,
+ timestamp,
+ cr.timestampType.id)
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ override def get(): Row = {
+ assert(nextRow != null)
+ nextOffset += 1
+ nextRow
+ }
+
+ override def close(): Unit = {
+ // Indicate that we're no longer using this consumer
+ CachedKafkaConsumer.releaseKafkaConsumer(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ }
+
+ private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
+ if (range.fromOffset < 0 || range.untilOffset < 0) {
+ // Late bind the offset range
+ val availableOffsetRange = consumer.getAvailableOffsetRange()
+ val fromOffset = if (range.fromOffset < 0) {
+ assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
+ s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}")
+ availableOffsetRange.earliest
+ } else {
+ range.fromOffset
+ }
+ val untilOffset = if (range.untilOffset < 0) {
+ assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
+ s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}")
+ availableOffsetRange.latest
+ } else {
+ range.untilOffset
+ }
+ KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset)
+ } else {
+ range
+ }
+ }
+}
+
+private[kafka010] case class KafkaOffsetRange(
+ topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long)
+
+
--- End diff --
nit: will remove this extra line.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167123917
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
--- End diff --
Moved the code from `KafkaSource.initialPartitionOffsets` into separate function for cleanliness
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87239 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87239/testReport)** for PR 20554 at commit [`05c9d20`](https://github.com/apache/spark/commit/05c9d20da4361d631d8839bd4a45e4966964afa0).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87497 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87497/testReport)** for PR 20554 at commit [`7f5df22`](https://github.com/apache/spark/commit/7f5df222da2e6cf59ed632b1c05165f1035202f3).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r168625742
--- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin ---
@@ -1 +1 @@
-2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
\ No newline at end of file
+2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
--- End diff --
I modified the to make the test "deserialization of initial offset written by Spark 2.1.0 " stronger. See the updated test. The way it goes now is that we start the query from earliest offset, and simultaneous have this initial offsets that are NOT at 0 offset. And we check that the query is reading the first offset as given in the initial offset and not the earliest available in the topic. Hence I am changing the file a little bit, the values not the format.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87238 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87238/testReport)** for PR 20554 at commit [`3ed2a50`](https://github.com/apache/spark/commit/3ed2a509276194214875f39e1e18d8093155c54c).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r168366098
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
)
}
- testWithUninterruptibleThread(
--- End diff --
Added
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167124308
--- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin ---
@@ -1 +1 @@
-2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
\ No newline at end of file
+2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
--- End diff --
note: should remove the newline to keep it consistent.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167123199
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---
@@ -408,8 +401,27 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
+
val TOPIC_OPTION_KEY = "topic"
+ val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
--- End diff --
Moved this from KafkaSource to this class because this is used by multiple reader classes and therefore should be present in the higher level class (e.g. the provider class).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87466 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87466/testReport)** for PR 20554 at commit [`ba4724b`](https://github.com/apache/spark/commit/ba4724bfbae88266c2b565c709e1f4ff4794ef57).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/881/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r168558562
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---
@@ -306,7 +307,7 @@ private[kafka010] class KafkaSource(
kafkaReader.close()
}
- override def toString(): String = s"KafkaSource[$kafkaReader]"
+ override def toString(): String = s"KafkaSourceV1[$kafkaReader]"
--- End diff --
good catch
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r168625863
--- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin ---
@@ -0,0 +1,2 @@
+0v99999
+{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
--- End diff --
done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87444 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87444/testReport)** for PR 20554 at commit [`677f897`](https://github.com/apache/spark/commit/677f89764dd04d6c0088af3bf39d5a049777d079).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:
https://github.com/apache/spark/pull/20554
@jose-torres @zsxwing please take a look.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167807584
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
--- End diff --
This method should close the reader.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/884/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/20554
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87465 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87465/testReport)** for PR 20554 at commit [`66f3ef2`](https://github.com/apache/spark/commit/66f3ef2d70bf9e7468bd28ee4a4f34c35726fd98).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167123713
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: PartitionOffsetMap,
+ until: PartitionOffsetMap): PartitionOffsetMap = {
+ val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
+ }
+
+ private def getSortedExecutorList(): Array[String] = {
--- End diff --
Moved from the `object KafkaSource` to inside this `KafkaBatchReadre` class to remove the object completely.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:
https://github.com/apache/spark/pull/20554
@zsxwing can you look at it once again. some more changes.
@jose-torres PTAL.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87466/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:
https://github.com/apache/spark/pull/20554
LGTM
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167809278
--- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
)
}
- testWithUninterruptibleThread(
--- End diff --
Agreed. How about just writing a test to make sure we do write 0 at the beginning?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87239 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87239/testReport)** for PR 20554 at commit [`05c9d20`](https://github.com/apache/spark/commit/05c9d20da4361d631d8839bd4a45e4966964afa0).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167811474
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---
@@ -28,50 +28,40 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
- * The provider class for the [[KafkaSource]]. This provider is designed such that it throws
+ * The provider class for all Kafka readers and writers. It is designed such that it throws
* IllegalArgumentException when the Kafka Dataset is created, so that it can catch
* missing options even before the query is started.
*/
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
- with StreamSourceProvider
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with StreamWriteSupport
with ContinuousReadSupport
+ with MicroBatchReadSupport
with Logging {
import KafkaSourceProvider._
override def shortName(): String = "kafka"
/**
- * Returns the name and schema of the source. In addition, it also verifies whether the options
- * are correct and sufficient to create the [[KafkaSource]] when the query is started.
+ * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches
+ * of Kafka data in a micro-batch streaming query.
*/
- override def sourceSchema(
- sqlContext: SQLContext,
- schema: Option[StructType],
- providerName: String,
- parameters: Map[String, String]): (String, StructType) = {
- validateStreamOptions(parameters)
- require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one")
- (shortName(), KafkaOffsetReader.kafkaSchema)
- }
-
- override def createSource(
- sqlContext: SQLContext,
+ def createMicroBatchReader(
--- End diff --
nit: `override`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/893/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/927/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167123580
--- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ type PartitionOffsetMap = Map[TopicPartition, Long]
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionOffsets")
+ newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the until partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ val sortedExecutors = getSortedExecutorList()
+ val numExecutors = sortedExecutors.length
+ logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+ // Calculate offset ranges
+ val factories = topicPartitions.flatMap { tp =>
+ val fromOffset = startPartitionOffsets.get(tp).getOrElse {
+ newPartitionOffsets.getOrElse(
+ tp, {
+ // This should not happen since newPartitionOffsets contains all partitions not in
+ // fromPartitionOffsets
+ throw new IllegalStateException(s"$tp doesn't have a from offset")
+ })
+ }
+ val untilOffset = endPartitionOffsets(tp)
+
+ if (untilOffset >= fromOffset) {
+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
+ // partition in every batch.
+ val preferredLoc = if (numExecutors > 0) {
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
+ } else None
+ val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
+ Some(
+ new KafkaMicroBatchDataReaderFactory(
+ range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
+ } else {
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ None
+ }
+ }
+ factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {}
+
+ override def toString(): String = s"Kafka[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: PartitionOffsetMap,
+ until: PartitionOffsetMap): PartitionOffsetMap = {
+ val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
+ }
+
+ private def getSortedExecutorList(): Array[String] = {
+
+ def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
+ }
+
+ val bm = SparkEnv.get.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compare)
+ .map(_.toString)
+ }
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ } else {
+ logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ }
+ }
+
+ /** A version of [[HDFSMetadataLog]] specialized for saving */
+ class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+ extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+ val VERSION = 1
+
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+ // HDFSMetadataLog guarantees that it never creates a partial file.
+ assert(content.length != 0)
+ if (content(0) == 'v') {
+ val indexOfNewLine = content.indexOf("\n")
+ if (indexOfNewLine > 0) {
+ val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+ KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+ } else {
+ throw new IllegalStateException(
+ s"Log file was malformed: failed to detect the log file version line.")
+ }
+ } else {
+ // The log was generated by Spark 2.1.0
+ KafkaSourceOffset(SerializedOffset(content))
+ }
+ }
+ }
+}
+
+/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReaderFactory(
+ range: KafkaOffsetRange,
+ preferredLoc: Option[String],
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
+
+ override def preferredLocations(): Array[String] = preferredLoc.toArray
+
+ override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
+ range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+}
+
+/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] class KafkaMicroBatchDataReader(
+ offsetRange: KafkaOffsetRange,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
+
+ private val consumer = CachedKafkaConsumer.getOrCreate(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ private val rangeToRead = resolveRange(offsetRange)
+ private val timestamp = new java.sql.Timestamp(0)
+
+ private var nextOffset = rangeToRead.fromOffset
+ private var nextRow: Row = _
+
+ override def next(): Boolean = {
+ if (nextOffset < rangeToRead.untilOffset) {
+ val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
+ if (cr != null) {
+ timestamp.setTime(cr.timestamp)
+ nextRow = Row(
+ cr.key,
+ cr.value,
+ cr.topic,
+ cr.partition,
+ cr.offset,
+ timestamp,
+ cr.timestampType.id)
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ override def get(): Row = {
+ assert(nextRow != null)
+ nextOffset += 1
+ nextRow
+ }
+
+ override def close(): Unit = {
+ // Indicate that we're no longer using this consumer
+ CachedKafkaConsumer.releaseKafkaConsumer(
+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
+ }
+
+ private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
--- End diff --
Same method as this KafkaSourceRDD method.
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala#L187
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/928/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87445 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87445/testReport)** for PR 20554 at commit [`5cd5a4c`](https://github.com/apache/spark/commit/5cd5a4cad7c8ccca1a5bfa7b11fa04f939a15015).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87239/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87433 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87433/testReport)** for PR 20554 at commit [`f9983d9`](https://github.com/apache/spark/commit/f9983d937b97b2ba9f020f370b8aefdd353a654b).
* This patch passes all tests.
* This patch **does not merge cleanly**.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87498/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87445 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87445/testReport)** for PR 20554 at commit [`5cd5a4c`](https://github.com/apache/spark/commit/5cd5a4cad7c8ccca1a5bfa7b11fa04f939a15015).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87466 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87466/testReport)** for PR 20554 at commit [`ba4724b`](https://github.com/apache/spark/commit/ba4724bfbae88266c2b565c709e1f4ff4794ef57).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/20554
LGTM again
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87445/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87498 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87498/testReport)** for PR 20554 at commit [`2dea08a`](https://github.com/apache/spark/commit/2dea08a4c5f85991e4ad4c7da886c2e0bf456bb8).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/20554
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/733/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87465 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87465/testReport)** for PR 20554 at commit [`66f3ef2`](https://github.com/apache/spark/commit/66f3ef2d70bf9e7468bd28ee4a4f34c35726fd98).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/20554
**[Test build #87433 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87433/testReport)** for PR 20554 at commit [`f9983d9`](https://github.com/apache/spark/commit/f9983d937b97b2ba9f020f370b8aefdd353a654b).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org