You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/09/12 18:25:35 UTC
[7/7] spark git commit: [SPARK-24882][SQL] Revert [] improve data
source v2 API from branch 2.4
[SPARK-24882][SQL] Revert [] improve data source v2 API from branch 2.4
## What changes were proposed in this pull request?
As discussed in the dev list, we don't want to include https://github.com/apache/spark/pull/22009 in Spark 2.4, as it needs data source v2 users to change the implementation intensitively, while they need to change again in next release.
## How was this patch tested?
existing tests
Author: Wenchen Fan <we...@databricks.com>
Closes #22388 from cloud-fan/revert.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15d2e9d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15d2e9d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15d2e9d7
Branch: refs/heads/branch-2.4
Commit: 15d2e9d7d2f0d5ecefd69bdc3f8a149670b05e79
Parents: 4c1428f
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Sep 12 11:25:24 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Sep 12 11:25:24 2018 -0700
----------------------------------------------------------------------
.../kafka010/KafkaContinuousReadSupport.scala | 255 -----------
.../sql/kafka010/KafkaContinuousReader.scala | 248 +++++++++++
.../kafka010/KafkaMicroBatchReadSupport.scala | 377 ----------------
.../sql/kafka010/KafkaMicroBatchReader.scala | 382 ++++++++++++++++
.../sql/kafka010/KafkaSourceProvider.scala | 37 +-
.../spark/sql/kafka010/KafkaStreamWriter.scala | 118 +++++
.../kafka010/KafkaStreamingWriteSupport.scala | 118 -----
.../kafka010/KafkaContinuousSourceSuite.scala | 12 +-
.../sql/kafka010/KafkaContinuousTest.scala | 8 +-
.../kafka010/KafkaMicroBatchSourceSuite.scala | 35 +-
.../sources/v2/BatchReadSupportProvider.java | 61 ---
.../sources/v2/BatchWriteSupportProvider.java | 59 ---
.../sql/sources/v2/ContinuousReadSupport.java | 46 ++
.../v2/ContinuousReadSupportProvider.java | 70 ---
.../spark/sql/sources/v2/DataSourceV2.java | 10 +-
.../sql/sources/v2/MicroBatchReadSupport.java | 52 +++
.../v2/MicroBatchReadSupportProvider.java | 70 ---
.../spark/sql/sources/v2/ReadSupport.java | 65 +++
.../sql/sources/v2/SessionConfigSupport.java | 7 +-
.../sql/sources/v2/StreamWriteSupport.java | 52 +++
.../v2/StreamingWriteSupportProvider.java | 54 ---
.../spark/sql/sources/v2/WriteSupport.java | 53 +++
.../sql/sources/v2/reader/BatchReadSupport.java | 51 ---
.../v2/reader/ContinuousInputPartition.java | 35 ++
.../sql/sources/v2/reader/DataSourceReader.java | 75 ++++
.../sql/sources/v2/reader/InputPartition.java | 26 +-
.../sources/v2/reader/InputPartitionReader.java | 53 +++
.../sql/sources/v2/reader/PartitionReader.java | 49 ---
.../v2/reader/PartitionReaderFactory.java | 66 ---
.../sql/sources/v2/reader/ReadSupport.java | 50 ---
.../spark/sql/sources/v2/reader/ScanConfig.java | 45 --
.../sources/v2/reader/ScanConfigBuilder.java | 30 --
.../spark/sql/sources/v2/reader/Statistics.java | 2 +-
.../v2/reader/SupportsPushDownFilters.java | 6 +-
.../reader/SupportsPushDownRequiredColumns.java | 8 +-
.../v2/reader/SupportsReportPartitioning.java | 12 +-
.../v2/reader/SupportsReportStatistics.java | 14 +-
.../v2/reader/SupportsScanColumnarBatch.java | 53 +++
.../partitioning/ClusteredDistribution.java | 4 +-
.../v2/reader/partitioning/Distribution.java | 6 +-
.../v2/reader/partitioning/Partitioning.java | 5 +-
.../ContinuousInputPartitionReader.java | 36 ++
.../streaming/ContinuousPartitionReader.java | 37 --
.../ContinuousPartitionReaderFactory.java | 40 --
.../reader/streaming/ContinuousReadSupport.java | 77 ----
.../v2/reader/streaming/ContinuousReader.java | 79 ++++
.../reader/streaming/MicroBatchReadSupport.java | 60 ---
.../v2/reader/streaming/MicroBatchReader.java | 75 ++++
.../sql/sources/v2/reader/streaming/Offset.java | 4 +-
.../reader/streaming/StreamingReadSupport.java | 49 ---
.../sources/v2/writer/BatchWriteSupport.java | 101 -----
.../sql/sources/v2/writer/DataSourceWriter.java | 116 +++++
.../spark/sql/sources/v2/writer/DataWriter.java | 16 +-
.../sources/v2/writer/DataWriterFactory.java | 23 +-
.../sources/v2/writer/WriterCommitMessage.java | 9 +-
.../v2/writer/streaming/StreamWriter.java | 71 +++
.../streaming/StreamingDataWriterFactory.java | 59 ---
.../writer/streaming/StreamingWriteSupport.java | 71 ---
.../org/apache/spark/sql/DataFrameReader.scala | 4 +-
.../org/apache/spark/sql/DataFrameWriter.scala | 8 +-
.../datasources/v2/DataSourceRDD.scala | 44 +-
.../datasources/v2/DataSourceV2Relation.scala | 72 ++-
.../datasources/v2/DataSourceV2ScanExec.scala | 65 +--
.../datasources/v2/DataSourceV2Strategy.scala | 49 +--
.../datasources/v2/DataSourceV2Utils.scala | 9 -
.../v2/WriteToDataSourceV2Exec.scala | 40 +-
.../streaming/MicroBatchExecution.scala | 91 ++--
.../execution/streaming/ProgressReporter.scala | 8 +-
.../SimpleStreamingScanConfigBuilder.scala | 40 --
.../execution/streaming/StreamingRelation.scala | 6 +-
.../spark/sql/execution/streaming/console.scala | 14 +-
.../continuous/ContinuousDataSourceRDD.scala | 37 +-
.../continuous/ContinuousExecution.scala | 51 +--
.../continuous/ContinuousQueuedDataReader.scala | 29 +-
.../continuous/ContinuousRateStreamSource.scala | 60 +--
.../continuous/ContinuousTextSocketSource.scala | 72 ++-
.../continuous/ContinuousWriteRDD.scala | 7 +-
.../streaming/continuous/EpochCoordinator.scala | 18 +-
.../WriteToContinuousDataSource.scala | 4 +-
.../WriteToContinuousDataSourceExec.scala | 10 +-
.../spark/sql/execution/streaming/memory.scala | 51 ++-
.../streaming/sources/ConsoleWriteSupport.scala | 71 ---
.../streaming/sources/ConsoleWriter.scala | 72 +++
.../sources/ContinuousMemoryStream.scala | 76 ++--
.../sources/ForeachWriteSupportProvider.scala | 140 ------
.../sources/ForeachWriterProvider.scala | 139 ++++++
.../sources/MicroBatchWritSupport.scala | 51 ---
.../streaming/sources/MicroBatchWriter.scala | 37 ++
.../sources/PackedRowWriterFactory.scala | 9 +-
.../RateControlMicroBatchReadSupport.scala | 31 --
.../RateStreamMicroBatchReadSupport.scala | 215 ---------
.../sources/RateStreamMicroBatchReader.scala | 220 ++++++++++
.../streaming/sources/RateStreamProvider.scala | 27 +-
.../execution/streaming/sources/memoryV2.scala | 35 +-
.../execution/streaming/sources/socket.scala | 114 ++---
.../spark/sql/streaming/DataStreamReader.scala | 52 +--
.../spark/sql/streaming/DataStreamWriter.scala | 9 +-
.../sql/streaming/StreamingQueryManager.scala | 4 +-
.../sources/v2/JavaAdvancedDataSourceV2.java | 147 +++----
.../sql/sources/v2/JavaBatchDataSourceV2.java | 114 +++++
.../sources/v2/JavaColumnarDataSourceV2.java | 114 -----
.../v2/JavaPartitionAwareDataSource.java | 81 ++--
.../v2/JavaSchemaRequiredDataSource.java | 26 +-
.../sql/sources/v2/JavaSimpleDataSourceV2.java | 68 ++-
.../sql/sources/v2/JavaSimpleReadSupport.java | 99 -----
....apache.spark.sql.sources.DataSourceRegister | 4 +-
.../execution/streaming/MemorySinkV2Suite.scala | 2 +-
.../sources/ConsoleWriteSupportSuite.scala | 151 -------
.../streaming/sources/ConsoleWriterSuite.scala | 153 +++++++
.../sources/RateStreamProviderSuite.scala | 84 ++--
.../sources/TextSocketStreamSuite.scala | 81 ++--
.../sql/sources/v2/DataSourceV2Suite.scala | 438 +++++++++----------
.../sources/v2/SimpleWritableDataSource.scala | 110 +++--
.../apache/spark/sql/streaming/StreamTest.scala | 2 +-
.../streaming/StreamingQueryListenerSuite.scala | 4 +-
.../sql/streaming/StreamingQuerySuite.scala | 58 ++-
.../ContinuousQueuedDataReaderSuite.scala | 45 +-
.../streaming/continuous/ContinuousSuite.scala | 2 +-
.../continuous/EpochCoordinatorSuite.scala | 18 +-
.../sources/StreamingDataSourceV2Suite.scala | 95 ++--
120 files changed, 3619 insertions(+), 4070 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
deleted file mode 100644
index 1753a28..0000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.{util => ju}
-import java.util.concurrent.TimeoutException
-
-import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException}
-import org.apache.kafka.common.TopicPartition
-
-import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.types.StructType
-
-/**
- * A [[ContinuousReadSupport]] for data from kafka.
- *
- * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
- * read by per-task consumers generated later.
- * @param kafkaParams String params for per-task Kafka consumers.
- * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
- * are not Kafka consumer params.
- * @param metadataPath Path to a directory this reader can use for writing metadata.
- * @param initialOffsets The Kafka offsets to start reading data at.
- * @param failOnDataLoss Flag indicating whether reading should fail in data loss
- * scenarios, where some offsets after the specified initial ones can't be
- * properly read.
- */
-class KafkaContinuousReadSupport(
- offsetReader: KafkaOffsetReader,
- kafkaParams: ju.Map[String, Object],
- sourceOptions: Map[String, String],
- metadataPath: String,
- initialOffsets: KafkaOffsetRangeLimit,
- failOnDataLoss: Boolean)
- extends ContinuousReadSupport with Logging {
-
- private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
-
- override def initialOffset(): Offset = {
- val offsets = initialOffsets match {
- case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
- case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
- case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
- }
- logInfo(s"Initial offsets: $offsets")
- offsets
- }
-
- override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
-
- override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
- new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss)
- }
-
- override def deserializeOffset(json: String): Offset = {
- KafkaSourceOffset(JsonUtils.partitionOffsets(json))
- }
-
- override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
- val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets
- startOffsets.toSeq.map {
- case (topicPartition, start) =>
- KafkaContinuousInputPartition(
- topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
- }.toArray
- }
-
- override def createContinuousReaderFactory(
- config: ScanConfig): ContinuousPartitionReaderFactory = {
- KafkaContinuousReaderFactory
- }
-
- /** Stop this source and free any resources it has allocated. */
- def stop(): Unit = synchronized {
- offsetReader.close()
- }
-
- override def commit(end: Offset): Unit = {}
-
- override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
- val mergedMap = offsets.map {
- case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
- }.reduce(_ ++ _)
- KafkaSourceOffset(mergedMap)
- }
-
- override def needsReconfiguration(config: ScanConfig): Boolean = {
- val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
- offsetReader.fetchLatestOffsets().keySet != knownPartitions
- }
-
- override def toString(): String = s"KafkaSource[$offsetReader]"
-
- /**
- * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
- * Otherwise, just log a warning.
- */
- private def reportDataLoss(message: String): Unit = {
- if (failOnDataLoss) {
- throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
- } else {
- logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
- }
- }
-}
-
-/**
- * An input partition for continuous Kafka processing. This will be serialized and transformed
- * into a full reader on executors.
- *
- * @param topicPartition The (topic, partition) pair this task is responsible for.
- * @param startOffset The offset to start reading from within the partition.
- * @param kafkaParams Kafka consumer params to use.
- * @param pollTimeoutMs The timeout for Kafka consumer polling.
- * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
- * are skipped.
- */
-case class KafkaContinuousInputPartition(
- topicPartition: TopicPartition,
- startOffset: Long,
- kafkaParams: ju.Map[String, Object],
- pollTimeoutMs: Long,
- failOnDataLoss: Boolean) extends InputPartition
-
-object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory {
- override def createReader(partition: InputPartition): ContinuousPartitionReader[InternalRow] = {
- val p = partition.asInstanceOf[KafkaContinuousInputPartition]
- new KafkaContinuousPartitionReader(
- p.topicPartition, p.startOffset, p.kafkaParams, p.pollTimeoutMs, p.failOnDataLoss)
- }
-}
-
-class KafkaContinuousScanConfigBuilder(
- schema: StructType,
- startOffset: Offset,
- offsetReader: KafkaOffsetReader,
- reportDataLoss: String => Unit)
- extends ScanConfigBuilder {
-
- override def build(): ScanConfig = {
- val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(startOffset)
-
- val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
- val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
- val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
-
- val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
- if (deletedPartitions.nonEmpty) {
- reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
- }
-
- val startOffsets = newPartitionOffsets ++
- oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
- KafkaContinuousScanConfig(schema, startOffsets)
- }
-}
-
-case class KafkaContinuousScanConfig(
- readSchema: StructType,
- startOffsets: Map[TopicPartition, Long])
- extends ScanConfig {
-
- // Created when building the scan config builder. If this diverges from the partitions at the
- // latest offsets, we need to reconfigure the kafka read support.
- def knownPartitions: Set[TopicPartition] = startOffsets.keySet
-}
-
-/**
- * A per-task data reader for continuous Kafka processing.
- *
- * @param topicPartition The (topic, partition) pair this data reader is responsible for.
- * @param startOffset The offset to start reading from within the partition.
- * @param kafkaParams Kafka consumer params to use.
- * @param pollTimeoutMs The timeout for Kafka consumer polling.
- * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
- * are skipped.
- */
-class KafkaContinuousPartitionReader(
- topicPartition: TopicPartition,
- startOffset: Long,
- kafkaParams: ju.Map[String, Object],
- pollTimeoutMs: Long,
- failOnDataLoss: Boolean) extends ContinuousPartitionReader[InternalRow] {
- private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
- private val converter = new KafkaRecordToUnsafeRowConverter
-
- private var nextKafkaOffset = startOffset
- private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
-
- override def next(): Boolean = {
- var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
- while (r == null) {
- if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false
- // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving
- // interrupt points to end the query rather than waiting for new data that might never come.
- try {
- r = consumer.get(
- nextKafkaOffset,
- untilOffset = Long.MaxValue,
- pollTimeoutMs,
- failOnDataLoss)
- } catch {
- // We didn't read within the timeout. We're supposed to block indefinitely for new data, so
- // swallow and ignore this.
- case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException =>
-
- // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
- // or if it's the endpoint of the data range (i.e. the "true" next offset).
- case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
- val range = consumer.getAvailableOffsetRange()
- if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
- // retry
- } else {
- throw e
- }
- }
- }
- nextKafkaOffset = r.offset + 1
- currentRecord = r
- true
- }
-
- override def get(): UnsafeRow = {
- converter.toUnsafeRow(currentRecord)
- }
-
- override def getOffset(): KafkaSourcePartitionOffset = {
- KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset)
- }
-
- override def close(): Unit = {
- consumer.release()
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
new file mode 100644
index 0000000..8ce56a2
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeoutException
+
+import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A [[ContinuousReader]] for data from kafka.
+ *
+ * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
+ * read by per-task consumers generated later.
+ * @param kafkaParams String params for per-task Kafka consumers.
+ * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
+ * are not Kafka consumer params.
+ * @param metadataPath Path to a directory this reader can use for writing metadata.
+ * @param initialOffsets The Kafka offsets to start reading data at.
+ * @param failOnDataLoss Flag indicating whether reading should fail in data loss
+ * scenarios, where some offsets after the specified initial ones can't be
+ * properly read.
+ */
+class KafkaContinuousReader(
+ offsetReader: KafkaOffsetReader,
+ kafkaParams: ju.Map[String, Object],
+ sourceOptions: Map[String, String],
+ metadataPath: String,
+ initialOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends ContinuousReader with Logging {
+
+ private lazy val session = SparkSession.getActiveSession.get
+ private lazy val sc = session.sparkContext
+
+ private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+
+ // Initialized when creating reader factories. If this diverges from the partitions at the latest
+ // offsets, we need to reconfigure.
+ // Exposed outside this object only for unit tests.
+ @volatile private[sql] var knownPartitions: Set[TopicPartition] = _
+
+ override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
+
+ private var offset: Offset = _
+ override def setStartOffset(start: ju.Optional[Offset]): Unit = {
+ offset = start.orElse {
+ val offsets = initialOffsets match {
+ case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }
+ }
+
+ override def getStartOffset(): Offset = offset
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
+ import scala.collection.JavaConverters._
+
+ val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
+
+ val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
+ val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
+ val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+
+ val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
+ }
+
+ val startOffsets = newPartitionOffsets ++
+ oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
+ knownPartitions = startOffsets.keySet
+
+ startOffsets.toSeq.map {
+ case (topicPartition, start) =>
+ KafkaContinuousInputPartition(
+ topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss
+ ): InputPartition[InternalRow]
+ }.asJava
+ }
+
+ /** Stop this source and free any resources it has allocated. */
+ def stop(): Unit = synchronized {
+ offsetReader.close()
+ }
+
+ override def commit(end: Offset): Unit = {}
+
+ override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+ val mergedMap = offsets.map {
+ case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+ }.reduce(_ ++ _)
+ KafkaSourceOffset(mergedMap)
+ }
+
+ override def needsReconfiguration(): Boolean = {
+ knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions
+ }
+
+ override def toString(): String = s"KafkaSource[$offsetReader]"
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ } else {
+ logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ }
+ }
+}
+
+/**
+ * An input partition for continuous Kafka processing. This will be serialized and transformed
+ * into a full reader on executors.
+ *
+ * @param topicPartition The (topic, partition) pair this task is responsible for.
+ * @param startOffset The offset to start reading from within the partition.
+ * @param kafkaParams Kafka consumer params to use.
+ * @param pollTimeoutMs The timeout for Kafka consumer polling.
+ * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
+ * are skipped.
+ */
+case class KafkaContinuousInputPartition(
+ topicPartition: TopicPartition,
+ startOffset: Long,
+ kafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends ContinuousInputPartition[InternalRow] {
+
+ override def createContinuousReader(
+ offset: PartitionOffset): InputPartitionReader[InternalRow] = {
+ val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
+ require(kafkaOffset.topicPartition == topicPartition,
+ s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
+ new KafkaContinuousInputPartitionReader(
+ topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
+ }
+
+ override def createPartitionReader(): KafkaContinuousInputPartitionReader = {
+ new KafkaContinuousInputPartitionReader(
+ topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
+ }
+}
+
+/**
+ * A per-task data reader for continuous Kafka processing.
+ *
+ * @param topicPartition The (topic, partition) pair this data reader is responsible for.
+ * @param startOffset The offset to start reading from within the partition.
+ * @param kafkaParams Kafka consumer params to use.
+ * @param pollTimeoutMs The timeout for Kafka consumer polling.
+ * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
+ * are skipped.
+ */
+class KafkaContinuousInputPartitionReader(
+ topicPartition: TopicPartition,
+ startOffset: Long,
+ kafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[InternalRow] {
+ private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
+ private val converter = new KafkaRecordToUnsafeRowConverter
+
+ private var nextKafkaOffset = startOffset
+ private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
+
+ override def next(): Boolean = {
+ var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
+ while (r == null) {
+ if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false
+ // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving
+ // interrupt points to end the query rather than waiting for new data that might never come.
+ try {
+ r = consumer.get(
+ nextKafkaOffset,
+ untilOffset = Long.MaxValue,
+ pollTimeoutMs,
+ failOnDataLoss)
+ } catch {
+ // We didn't read within the timeout. We're supposed to block indefinitely for new data, so
+ // swallow and ignore this.
+ case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException =>
+
+ // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
+ // or if it's the endpoint of the data range (i.e. the "true" next offset).
+ case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
+ val range = consumer.getAvailableOffsetRange()
+ if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
+ // retry
+ } else {
+ throw e
+ }
+ }
+ }
+ nextKafkaOffset = r.offset + 1
+ currentRecord = r
+ true
+ }
+
+ override def get(): UnsafeRow = {
+ converter.toUnsafeRow(currentRecord)
+ }
+
+ override def getOffset(): KafkaSourcePartitionOffset = {
+ KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset)
+ }
+
+ override def close(): Unit = {
+ consumer.release()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
deleted file mode 100644
index bb4de67..0000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.{util => ju}
-import java.io._
-import java.nio.charset.StandardCharsets
-
-import org.apache.commons.io.IOUtils
-
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.ExecutorCacheTaskLocation
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
-import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
-import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.UninterruptibleThread
-
-/**
- * A [[MicroBatchReadSupport]] that reads data from Kafka.
- *
- * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
- * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
- * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
- * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
- * with the semantics of `KafkaConsumer.position()`.
- *
- * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
- * must make sure all messages in a topic have been processed when deleting a topic.
- *
- * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
- * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
- * and not use wrong broker addresses.
- */
-private[kafka010] class KafkaMicroBatchReadSupport(
- kafkaOffsetReader: KafkaOffsetReader,
- executorKafkaParams: ju.Map[String, Object],
- options: DataSourceOptions,
- metadataPath: String,
- startingOffsets: KafkaOffsetRangeLimit,
- failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging {
-
- private val pollTimeoutMs = options.getLong(
- "kafkaConsumer.pollTimeoutMs",
- SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
-
- private val maxOffsetsPerTrigger =
- Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
-
- private val rangeCalculator = KafkaOffsetRangeCalculator(options)
-
- private var endPartitionOffsets: KafkaSourceOffset = _
-
- /**
- * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
- * called in StreamExecutionThread. Otherwise, interrupting a thread while running
- * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
- */
- override def initialOffset(): Offset = {
- KafkaSourceOffset(getOrCreateInitialPartitionOffsets())
- }
-
- override def latestOffset(start: Offset): Offset = {
- val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
- val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
- endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
- rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
- }.getOrElse {
- latestPartitionOffsets
- })
- endPartitionOffsets
- }
-
- override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
-
- override def newScanConfigBuilder(start: Offset, end: Offset): ScanConfigBuilder = {
- new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
- }
-
- override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
- val sc = config.asInstanceOf[SimpleStreamingScanConfig]
- val startPartitionOffsets = sc.start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
- val endPartitionOffsets = sc.end.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets
-
- // Find the new partitions, and get their earliest offsets
- val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
- val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
- if (newPartitionInitialOffsets.keySet != newPartitions) {
- // We cannot get from offsets for some partitions. It means they got deleted.
- val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
- reportDataLoss(
- s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
- }
- logInfo(s"Partitions added: $newPartitionInitialOffsets")
- newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
- reportDataLoss(
- s"Added partition $p starts from $o instead of 0. Some data may have been missed")
- }
-
- // Find deleted partitions, and report data loss if required
- val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
- if (deletedPartitions.nonEmpty) {
- reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
- }
-
- // Use the end partitions to calculate offset ranges to ignore partitions that have
- // been deleted
- val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
- // Ignore partitions that we don't know the from offsets.
- newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp)
- }.toSeq
- logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
-
- // Calculate offset ranges
- val offsetRanges = rangeCalculator.getRanges(
- fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
- untilOffsets = endPartitionOffsets,
- executorLocations = getSortedExecutorList())
-
- // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
- // that is, concurrent tasks will not read the same TopicPartitions.
- val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size
-
- // Generate factories based on the offset ranges
- offsetRanges.map { range =>
- KafkaMicroBatchInputPartition(
- range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
- }.toArray
- }
-
- override def createReaderFactory(config: ScanConfig): PartitionReaderFactory = {
- KafkaMicroBatchReaderFactory
- }
-
- override def deserializeOffset(json: String): Offset = {
- KafkaSourceOffset(JsonUtils.partitionOffsets(json))
- }
-
- override def commit(end: Offset): Unit = {}
-
- override def stop(): Unit = {
- kafkaOffsetReader.close()
- }
-
- override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
-
- /**
- * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
- * the checkpoint.
- */
- private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
- // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
- // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
- // (KAFKA-1894).
- assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
-
- // SparkSession is required for getting Hadoop configuration for writing to checkpoints
- assert(SparkSession.getActiveSession.nonEmpty)
-
- val metadataLog =
- new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
- metadataLog.get(0).getOrElse {
- val offsets = startingOffsets match {
- case EarliestOffsetRangeLimit =>
- KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
- case LatestOffsetRangeLimit =>
- KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
- case SpecificOffsetRangeLimit(p) =>
- kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
- }
- metadataLog.add(0, offsets)
- logInfo(s"Initial offsets: $offsets")
- offsets
- }.partitionToOffsets
- }
-
- /** Proportionally distribute limit number of offsets among topicpartitions */
- private def rateLimit(
- limit: Long,
- from: PartitionOffsetMap,
- until: PartitionOffsetMap): PartitionOffsetMap = {
- val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
- val sizes = until.flatMap {
- case (tp, end) =>
- // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
- from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
- val size = end - begin
- logDebug(s"rateLimit $tp size is $size")
- if (size > 0) Some(tp -> size) else None
- }
- }
- val total = sizes.values.sum.toDouble
- if (total < 1) {
- until
- } else {
- until.map {
- case (tp, end) =>
- tp -> sizes.get(tp).map { size =>
- val begin = from.get(tp).getOrElse(fromNew(tp))
- val prorate = limit * (size / total)
- // Don't completely starve small topicpartitions
- val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
- // Paranoia, make sure not to return an offset that's past end
- Math.min(end, off)
- }.getOrElse(end)
- }
- }
- }
-
- private def getSortedExecutorList(): Array[String] = {
-
- def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
- if (a.host == b.host) {
- a.executorId > b.executorId
- } else {
- a.host > b.host
- }
- }
-
- val bm = SparkEnv.get.blockManager
- bm.master.getPeers(bm.blockManagerId).toArray
- .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
- .sortWith(compare)
- .map(_.toString)
- }
-
- /**
- * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
- * Otherwise, just log a warning.
- */
- private def reportDataLoss(message: String): Unit = {
- if (failOnDataLoss) {
- throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
- } else {
- logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
- }
- }
-
- /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */
- class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
- extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
-
- val VERSION = 1
-
- override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
- out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
- val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
- writer.write("v" + VERSION + "\n")
- writer.write(metadata.json)
- writer.flush
- }
-
- override def deserialize(in: InputStream): KafkaSourceOffset = {
- in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
- val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
- // HDFSMetadataLog guarantees that it never creates a partial file.
- assert(content.length != 0)
- if (content(0) == 'v') {
- val indexOfNewLine = content.indexOf("\n")
- if (indexOfNewLine > 0) {
- val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
- KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
- } else {
- throw new IllegalStateException(
- s"Log file was malformed: failed to detect the log file version line.")
- }
- } else {
- // The log was generated by Spark 2.1.0
- KafkaSourceOffset(SerializedOffset(content))
- }
- }
- }
-}
-
-/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
-private[kafka010] case class KafkaMicroBatchInputPartition(
- offsetRange: KafkaOffsetRange,
- executorKafkaParams: ju.Map[String, Object],
- pollTimeoutMs: Long,
- failOnDataLoss: Boolean,
- reuseKafkaConsumer: Boolean) extends InputPartition
-
-private[kafka010] object KafkaMicroBatchReaderFactory extends PartitionReaderFactory {
- override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
- val p = partition.asInstanceOf[KafkaMicroBatchInputPartition]
- KafkaMicroBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs,
- p.failOnDataLoss, p.reuseKafkaConsumer)
- }
-}
-
-/** A [[PartitionReader]] for reading Kafka data in a micro-batch streaming query. */
-private[kafka010] case class KafkaMicroBatchPartitionReader(
- offsetRange: KafkaOffsetRange,
- executorKafkaParams: ju.Map[String, Object],
- pollTimeoutMs: Long,
- failOnDataLoss: Boolean,
- reuseKafkaConsumer: Boolean) extends PartitionReader[InternalRow] with Logging {
-
- private val consumer = KafkaDataConsumer.acquire(
- offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
-
- private val rangeToRead = resolveRange(offsetRange)
- private val converter = new KafkaRecordToUnsafeRowConverter
-
- private var nextOffset = rangeToRead.fromOffset
- private var nextRow: UnsafeRow = _
-
- override def next(): Boolean = {
- if (nextOffset < rangeToRead.untilOffset) {
- val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
- if (record != null) {
- nextRow = converter.toUnsafeRow(record)
- nextOffset = record.offset + 1
- true
- } else {
- false
- }
- } else {
- false
- }
- }
-
- override def get(): UnsafeRow = {
- assert(nextRow != null)
- nextRow
- }
-
- override def close(): Unit = {
- consumer.release()
- }
-
- private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
- if (range.fromOffset < 0 || range.untilOffset < 0) {
- // Late bind the offset range
- val availableOffsetRange = consumer.getAvailableOffsetRange()
- val fromOffset = if (range.fromOffset < 0) {
- assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
- s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}")
- availableOffsetRange.earliest
- } else {
- range.fromOffset
- }
- val untilOffset = if (range.untilOffset < 0) {
- assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
- s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}")
- availableOffsetRange.latest
- } else {
- range.untilOffset
- }
- KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None)
- } else {
- range
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
new file mode 100644
index 0000000..8cc989f
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+ kafkaOffsetReader: KafkaOffsetReader,
+ executorKafkaParams: ju.Map[String, Object],
+ options: DataSourceOptions,
+ metadataPath: String,
+ startingOffsets: KafkaOffsetRangeLimit,
+ failOnDataLoss: Boolean)
+ extends MicroBatchReader with Logging {
+
+ private var startPartitionOffsets: PartitionOffsetMap = _
+ private var endPartitionOffsets: PartitionOffsetMap = _
+
+ private val pollTimeoutMs = options.getLong(
+ "kafkaConsumer.pollTimeoutMs",
+ SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
+
+ private val maxOffsetsPerTrigger =
+ Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+ private val rangeCalculator = KafkaOffsetRangeCalculator(options)
+ /**
+ * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
+ * called in StreamExecutionThread. Otherwise, interrupting a thread while running
+ * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+ */
+ private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
+
+ override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
+ // Make sure initialPartitionOffsets is initialized
+ initialPartitionOffsets
+
+ startPartitionOffsets = Option(start.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse(initialPartitionOffsets)
+
+ endPartitionOffsets = Option(end.orElse(null))
+ .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+ .getOrElse {
+ val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ maxOffsetsPerTrigger.map { maxOffsets =>
+ rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+ }.getOrElse {
+ latestPartitionOffsets
+ }
+ }
+ }
+
+ override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
+ // Find the new partitions, and get their earliest offsets
+ val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+ val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+ if (newPartitionInitialOffsets.keySet != newPartitions) {
+ // We cannot get from offsets for some partitions. It means they got deleted.
+ val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
+ reportDataLoss(
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ }
+ logInfo(s"Partitions added: $newPartitionInitialOffsets")
+ newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+ reportDataLoss(
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ }
+
+ // Find deleted partitions, and report data loss if required
+ val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+ if (deletedPartitions.nonEmpty) {
+ reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
+ }
+
+ // Use the end partitions to calculate offset ranges to ignore partitions that have
+ // been deleted
+ val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+ // Ignore partitions that we don't know the from offsets.
+ newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp)
+ }.toSeq
+ logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+ // Calculate offset ranges
+ val offsetRanges = rangeCalculator.getRanges(
+ fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
+ untilOffsets = endPartitionOffsets,
+ executorLocations = getSortedExecutorList())
+
+ // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
+ // that is, concurrent tasks will not read the same TopicPartitions.
+ val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size
+
+ // Generate factories based on the offset ranges
+ offsetRanges.map { range =>
+ new KafkaMicroBatchInputPartition(
+ range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer
+ ): InputPartition[InternalRow]
+ }.asJava
+ }
+
+ override def getStartOffset: Offset = {
+ KafkaSourceOffset(startPartitionOffsets)
+ }
+
+ override def getEndOffset: Offset = {
+ KafkaSourceOffset(endPartitionOffsets)
+ }
+
+ override def deserializeOffset(json: String): Offset = {
+ KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+ }
+
+ override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = {
+ kafkaOffsetReader.close()
+ }
+
+ override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
+
+ /**
+ * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
+ * the checkpoint.
+ */
+ private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+ // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
+ // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
+ // (KAFKA-1894).
+ assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+ // SparkSession is required for getting Hadoop configuration for writing to checkpoints
+ assert(SparkSession.getActiveSession.nonEmpty)
+
+ val metadataLog =
+ new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
+ metadataLog.get(0).getOrElse {
+ val offsets = startingOffsets match {
+ case EarliestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ case SpecificOffsetRangeLimit(p) =>
+ kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ metadataLog.add(0, offsets)
+ logInfo(s"Initial offsets: $offsets")
+ offsets
+ }.partitionToOffsets
+ }
+
+ /** Proportionally distribute limit number of offsets among topicpartitions */
+ private def rateLimit(
+ limit: Long,
+ from: PartitionOffsetMap,
+ until: PartitionOffsetMap): PartitionOffsetMap = {
+ val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+ val sizes = until.flatMap {
+ case (tp, end) =>
+ // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+ from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+ val size = end - begin
+ logDebug(s"rateLimit $tp size is $size")
+ if (size > 0) Some(tp -> size) else None
+ }
+ }
+ val total = sizes.values.sum.toDouble
+ if (total < 1) {
+ until
+ } else {
+ until.map {
+ case (tp, end) =>
+ tp -> sizes.get(tp).map { size =>
+ val begin = from.get(tp).getOrElse(fromNew(tp))
+ val prorate = limit * (size / total)
+ // Don't completely starve small topicpartitions
+ val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+ // Paranoia, make sure not to return an offset that's past end
+ Math.min(end, off)
+ }.getOrElse(end)
+ }
+ }
+ }
+
+ private def getSortedExecutorList(): Array[String] = {
+
+ def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
+ }
+
+ val bm = SparkEnv.get.blockManager
+ bm.master.getPeers(bm.blockManagerId).toArray
+ .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+ .sortWith(compare)
+ .map(_.toString)
+ }
+
+ /**
+ * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * Otherwise, just log a warning.
+ */
+ private def reportDataLoss(message: String): Unit = {
+ if (failOnDataLoss) {
+ throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ } else {
+ logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ }
+ }
+
+ /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */
+ class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
+ extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+ val VERSION = 1
+
+ override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+ out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
+ val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+ writer.write("v" + VERSION + "\n")
+ writer.write(metadata.json)
+ writer.flush
+ }
+
+ override def deserialize(in: InputStream): KafkaSourceOffset = {
+ in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+ val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+ // HDFSMetadataLog guarantees that it never creates a partial file.
+ assert(content.length != 0)
+ if (content(0) == 'v') {
+ val indexOfNewLine = content.indexOf("\n")
+ if (indexOfNewLine > 0) {
+ val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+ KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+ } else {
+ throw new IllegalStateException(
+ s"Log file was malformed: failed to detect the log file version line.")
+ }
+ } else {
+ // The log was generated by Spark 2.1.0
+ KafkaSourceOffset(SerializedOffset(content))
+ }
+ }
+ }
+}
+
+/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] case class KafkaMicroBatchInputPartition(
+ offsetRange: KafkaOffsetRange,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean,
+ reuseKafkaConsumer: Boolean) extends InputPartition[InternalRow] {
+
+ override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray
+
+ override def createPartitionReader(): InputPartitionReader[InternalRow] =
+ new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs,
+ failOnDataLoss, reuseKafkaConsumer)
+}
+
+/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] case class KafkaMicroBatchInputPartitionReader(
+ offsetRange: KafkaOffsetRange,
+ executorKafkaParams: ju.Map[String, Object],
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean,
+ reuseKafkaConsumer: Boolean) extends InputPartitionReader[InternalRow] with Logging {
+
+ private val consumer = KafkaDataConsumer.acquire(
+ offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
+
+ private val rangeToRead = resolveRange(offsetRange)
+ private val converter = new KafkaRecordToUnsafeRowConverter
+
+ private var nextOffset = rangeToRead.fromOffset
+ private var nextRow: UnsafeRow = _
+
+ override def next(): Boolean = {
+ if (nextOffset < rangeToRead.untilOffset) {
+ val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
+ if (record != null) {
+ nextRow = converter.toUnsafeRow(record)
+ nextOffset = record.offset + 1
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ override def get(): UnsafeRow = {
+ assert(nextRow != null)
+ nextRow
+ }
+
+ override def close(): Unit = {
+ consumer.release()
+ }
+
+ private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
+ if (range.fromOffset < 0 || range.untilOffset < 0) {
+ // Late bind the offset range
+ val availableOffsetRange = consumer.getAvailableOffsetRange()
+ val fromOffset = if (range.fromOffset < 0) {
+ assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
+ s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}")
+ availableOffsetRange.earliest
+ } else {
+ range.fromOffset
+ }
+ val untilOffset = if (range.untilOffset < 0) {
+ assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
+ s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}")
+ availableOffsetRange.latest
+ } else {
+ range.untilOffset
+ }
+ KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None)
+ } else {
+ range
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 28c9853..d225c1e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,8 +30,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -45,9 +46,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
- with StreamingWriteSupportProvider
- with ContinuousReadSupportProvider
- with MicroBatchReadSupportProvider
+ with StreamWriteSupport
+ with ContinuousReadSupport
+ with MicroBatchReadSupport
with Logging {
import KafkaSourceProvider._
@@ -107,12 +108,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
/**
- * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read
- * batches of Kafka data in a micro-batch streaming query.
+ * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches
+ * of Kafka data in a micro-batch streaming query.
*/
- override def createMicroBatchReadSupport(
+ override def createMicroBatchReader(
+ schema: Optional[StructType],
metadataPath: String,
- options: DataSourceOptions): KafkaMicroBatchReadSupport = {
+ options: DataSourceOptions): KafkaMicroBatchReader = {
val parameters = options.asMap().asScala.toMap
validateStreamOptions(parameters)
@@ -138,7 +140,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
- new KafkaMicroBatchReadSupport(
+ new KafkaMicroBatchReader(
kafkaOffsetReader,
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
options,
@@ -148,12 +150,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
/**
- * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
+ * Creates a [[ContinuousInputPartitionReader]] to read
* Kafka data in a continuous streaming query.
*/
- override def createContinuousReadSupport(
+ override def createContinuousReader(
+ schema: Optional[StructType],
metadataPath: String,
- options: DataSourceOptions): KafkaContinuousReadSupport = {
+ options: DataSourceOptions): KafkaContinuousReader = {
val parameters = options.asMap().asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
@@ -178,7 +181,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
- new KafkaContinuousReadSupport(
+ new KafkaContinuousReader(
kafkaOffsetReader,
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
parameters,
@@ -267,11 +270,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}
- override def createStreamingWriteSupport(
+ override def createStreamWriter(
queryId: String,
schema: StructType,
mode: OutputMode,
- options: DataSourceOptions): StreamingWriteSupport = {
+ options: DataSourceOptions): StreamWriter = {
import scala.collection.JavaConverters._
val spark = SparkSession.getActiveSession.get
@@ -282,7 +285,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
KafkaWriter.validateQuery(
schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic)
- new KafkaStreamingWriteSupport(topic, producerParams, schema)
+ new KafkaStreamWriter(topic, producerParams, schema)
}
private def strategy(caseInsensitiveParams: Map[String, String]) =
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
new file mode 100644
index 0000000..97c577d
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
+ * don't need to really send one.
+ */
+case object KafkaWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[StreamWriter]] for Kafka writing. Responsible for generating the writer factory.
+ *
+ * @param topic The topic this writer is responsible for. If None, topic will be inferred from
+ * a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+class KafkaStreamWriter(
+ topic: Option[String], producerParams: Map[String, String], schema: StructType)
+ extends StreamWriter {
+
+ validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
+
+ override def createWriterFactory(): KafkaStreamWriterFactory =
+ KafkaStreamWriterFactory(topic, producerParams, schema)
+
+ override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+ override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+/**
+ * A [[StreamingDataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to
+ * generate the per-task data writers.
+ * @param topic The topic that should be written to. If None, topic will be inferred from
+ * a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+case class KafkaStreamWriterFactory(
+ topic: Option[String], producerParams: Map[String, String], schema: StructType)
+ extends DataWriterFactory[InternalRow] {
+
+ override def createDataWriter(
+ partitionId: Int,
+ taskId: Long,
+ epochId: Long): DataWriter[InternalRow] = {
+ new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
+ }
+}
+
+/**
+ * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to
+ * process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
+ * from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+class KafkaStreamDataWriter(
+ targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
+ extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
+ import scala.collection.JavaConverters._
+
+ private lazy val producer = CachedKafkaProducer.getOrCreate(
+ new java.util.HashMap[String, Object](producerParams.asJava))
+
+ def write(row: InternalRow): Unit = {
+ checkForErrors()
+ sendRow(row, producer)
+ }
+
+ def commit(): WriterCommitMessage = {
+ // Send is asynchronous, but we can't commit until all rows are actually in Kafka.
+ // This requires flushing and then checking that no callbacks produced errors.
+ // We also check for errors before to fail as soon as possible - the check is cheap.
+ checkForErrors()
+ producer.flush()
+ checkForErrors()
+ KafkaWriterCommitMessage
+ }
+
+ def abort(): Unit = {}
+
+ def close(): Unit = {
+ checkForErrors()
+ if (producer != null) {
+ producer.flush()
+ checkForErrors()
+ CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
deleted file mode 100644
index 927c56d..0000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
-import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
-import org.apache.spark.sql.types.StructType
-
-/**
- * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
- * don't need to really send one.
- */
-case object KafkaWriterCommitMessage extends WriterCommitMessage
-
-/**
- * A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory.
- *
- * @param topic The topic this writer is responsible for. If None, topic will be inferred from
- * a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-class KafkaStreamingWriteSupport(
- topic: Option[String], producerParams: Map[String, String], schema: StructType)
- extends StreamingWriteSupport {
-
- validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
-
- override def createStreamingWriterFactory(): KafkaStreamWriterFactory =
- KafkaStreamWriterFactory(topic, producerParams, schema)
-
- override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
- override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
-}
-
-/**
- * A [[StreamingDataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to
- * generate the per-task data writers.
- * @param topic The topic that should be written to. If None, topic will be inferred from
- * a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-case class KafkaStreamWriterFactory(
- topic: Option[String], producerParams: Map[String, String], schema: StructType)
- extends StreamingDataWriterFactory {
-
- override def createWriter(
- partitionId: Int,
- taskId: Long,
- epochId: Long): DataWriter[InternalRow] = {
- new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
- }
-}
-
-/**
- * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to
- * process incoming rows.
- *
- * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
- * from a `topic` field in the incoming data.
- * @param producerParams Parameters to use for the Kafka producer.
- * @param inputSchema The attributes in the input data.
- */
-class KafkaStreamDataWriter(
- targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
- extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
- import scala.collection.JavaConverters._
-
- private lazy val producer = CachedKafkaProducer.getOrCreate(
- new java.util.HashMap[String, Object](producerParams.asJava))
-
- def write(row: InternalRow): Unit = {
- checkForErrors()
- sendRow(row, producer)
- }
-
- def commit(): WriterCommitMessage = {
- // Send is asynchronous, but we can't commit until all rows are actually in Kafka.
- // This requires flushing and then checking that no callbacks produced errors.
- // We also check for errors before to fail as soon as possible - the check is cheap.
- checkForErrors()
- producer.flush()
- checkForErrors()
- KafkaWriterCommitMessage
- }
-
- def abort(): Unit = {}
-
- def close(): Unit = {
- checkForErrors()
- if (producer != null) {
- producer.flush()
- checkForErrors()
- CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index af51021..a0e5818 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.streaming.Trigger
@@ -207,13 +207,11 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
testUtils.createTopic(topic2, partitions = 5)
eventually(timeout(streamingTimeout)) {
assert(
- query.lastExecution.executedPlan.collectFirst {
- case scan: DataSourceV2ScanExec
- if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
- scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
- }.exists { config =>
+ query.lastExecution.logical.collectFirst {
+ case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
+ }.exists { r =>
// Ensure the new topic is present and the old topic is gone.
- config.knownPartitions.exists(_.topic == topic2)
+ r.knownPartitions.exists(_.topic == topic2)
},
s"query never reconfigured to new topic $topic2")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index fa6bdc2..fa1468a 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.streaming.Trigger
@@ -46,10 +46,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
testUtils.addPartitions(topic, newCount)
eventually(timeout(streamingTimeout)) {
assert(
- query.lastExecution.executedPlan.collectFirst {
- case scan: DataSourceV2ScanExec
- if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
- scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
+ query.lastExecution.logical.collectFirst {
+ case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 8e246db..65615fd 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import java.io._
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.{Files, Paths}
-import java.util.Locale
+import java.util.{Locale, Optional}
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
+import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
@@ -40,9 +40,11 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest {
@@ -112,16 +114,14 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
query.nonEmpty,
"Cannot add data when there is no query for finding the active kafka source")
- val sources: Seq[BaseStreamingSource] = {
+ val sources = {
query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: KafkaSource, _) => source
- case StreamingExecutionRelation(source: KafkaMicroBatchReadSupport, _) => source
+ case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
- case r: StreamingDataSourceV2Relation
- if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
- r.readSupport.asInstanceOf[KafkaContinuousReadSupport]
+ case StreamingDataSourceV2Relation(_, _, _, reader: KafkaContinuousReader) => reader
}
})
}.distinct
@@ -905,7 +905,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
makeSureGetOffsetCalled,
AssertOnQuery { query =>
query.logicalPlan.collect {
- case StreamingExecutionRelation(_: KafkaMicroBatchReadSupport, _) => true
+ case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true
}.nonEmpty
}
)
@@ -930,16 +930,17 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
) ++ Option(minPartitions).map { p => "minPartitions" -> p}
- val readSupport = provider.createMicroBatchReadSupport(
- dir.getAbsolutePath, new DataSourceOptions(options.asJava))
- val config = readSupport.newScanConfigBuilder(
- KafkaSourceOffset(Map(tp -> 0L)),
- KafkaSourceOffset(Map(tp -> 100L))).build()
- val inputPartitions = readSupport.planInputPartitions(config)
+ val reader = provider.createMicroBatchReader(
+ Optional.empty[StructType], dir.getAbsolutePath, new DataSourceOptions(options.asJava))
+ reader.setOffsetRange(
+ Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
+ Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
+ )
+ val factories = reader.planInputPartitions().asScala
.map(_.asInstanceOf[KafkaMicroBatchInputPartition])
- withClue(s"minPartitions = $minPartitions generated factories $inputPartitions\n\t") {
- assert(inputPartitions.size == numPartitionsGenerated)
- inputPartitions.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }
+ withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
+ assert(factories.size == numPartitionsGenerated)
+ factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org