You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2018/02/09 01:54:06 UTC

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/20554

    [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

    ## What changes were proposed in this pull request?
    Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with data source v2).
    
    ## How was this patch tested?
    Existing tests, few modified to be better tests than the existing ones.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-23362

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20554
    
----
commit 3ed2a509276194214875f39e1e18d8093155c54c
Author: Tathagata Das <ta...@...>
Date:   2018-02-09T01:46:56Z

    Migrated

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168120568
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    +
    +  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
    +
    +  /**
    +   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
    +   * the checkpoint.
    +   */
    +  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
    +    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
    +    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
    +    // (KAFKA-1894).
    +    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
    +
    +    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
    +    assert(SparkSession.getActiveSession.nonEmpty)
    +
    +    val metadataLog =
    +      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
    +    metadataLog.get(0).getOrElse {
    +      val offsets = startingOffsets match {
    +        case EarliestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
    +        case LatestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
    +        case SpecificOffsetRangeLimit(p) =>
    +          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
    +      }
    +      metadataLog.add(0, offsets)
    +      logInfo(s"Initial offsets: $offsets")
    +      offsets
    +    }.partitionToOffsets
    +  }
    +
    +  /** Proportionally distribute limit number of offsets among topicpartitions */
    +  private def rateLimit(
    +      limit: Long,
    +      from: PartitionOffsetMap,
    +      until: PartitionOffsetMap): PartitionOffsetMap = {
    +    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
    +    val sizes = until.flatMap {
    +      case (tp, end) =>
    +        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
    +        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
    +          val size = end - begin
    +          logDebug(s"rateLimit $tp size is $size")
    +          if (size > 0) Some(tp -> size) else None
    +        }
    +    }
    +    val total = sizes.values.sum.toDouble
    +    if (total < 1) {
    +      until
    +    } else {
    +      until.map {
    +        case (tp, end) =>
    +          tp -> sizes.get(tp).map { size =>
    +            val begin = from.get(tp).getOrElse(fromNew(tp))
    +            val prorate = limit * (size / total)
    +            // Don't completely starve small topicpartitions
    +            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
    +            // Paranoia, make sure not to return an offset that's past end
    +            Math.min(end, off)
    +          }.getOrElse(end)
    +      }
    +    }
    +  }
    +
    +  private def getSortedExecutorList(): Array[String] = {
    +
    +    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
    +      if (a.host == b.host) {
    +        a.executorId > b.executorId
    +      } else {
    +        a.host > b.host
    +      }
    +    }
    +
    +    val bm = SparkEnv.get.blockManager
    +    bm.master.getPeers(bm.blockManagerId).toArray
    +      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
    +      .sortWith(compare)
    +      .map(_.toString)
    +  }
    +
    +  /**
    +   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
    +   * Otherwise, just log a warning.
    +   */
    +  private def reportDataLoss(message: String): Unit = {
    +    if (failOnDataLoss) {
    +      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
    +    } else {
    +      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
    +    }
    +  }
    +
    +  /** A version of [[HDFSMetadataLog]] specialized for saving  */
    +  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
    +    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
    +
    +    val VERSION = 1
    +
    +    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
    +      out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
    +      val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
    +      writer.write("v" + VERSION + "\n")
    +      writer.write(metadata.json)
    +      writer.flush
    +    }
    +
    +    override def deserialize(in: InputStream): KafkaSourceOffset = {
    +      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
    +      val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
    +      // HDFSMetadataLog guarantees that it never creates a partial file.
    +      assert(content.length != 0)
    +      if (content(0) == 'v') {
    +        val indexOfNewLine = content.indexOf("\n")
    +        if (indexOfNewLine > 0) {
    +          val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
    +          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
    +        } else {
    +          throw new IllegalStateException(
    +            s"Log file was malformed: failed to detect the log file version line.")
    +        }
    +      } else {
    +        // The log was generated by Spark 2.1.0
    +        KafkaSourceOffset(SerializedOffset(content))
    +      }
    +    }
    +  }
    +}
    +
    +/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReaderFactory(
    +    range: KafkaOffsetRange,
    +    preferredLoc: Option[String],
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
    +
    +  override def preferredLocations(): Array[String] = preferredLoc.toArray
    +
    +  override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
    +    range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
    +}
    +
    +/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReader(
    +    offsetRange: KafkaOffsetRange,
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
    +
    +  private val consumer = CachedKafkaConsumer.getOrCreate(
    +    offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
    +  private val rangeToRead = resolveRange(offsetRange)
    +  private val timestamp = new java.sql.Timestamp(0)
    +
    +  private var nextOffset = rangeToRead.fromOffset
    +  private var nextRow: Row = _
    +
    +  override def next(): Boolean = {
    +    if (nextOffset < rangeToRead.untilOffset) {
    +      val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
    +      if (cr != null) {
    +        timestamp.setTime(cr.timestamp)
    +        nextRow = Row(
    --- End diff --
    
    I checked the code, I dont think there should be a performance regression. Here is how the data was processed.
    - Old source: Wrapped objects into GenericInternalRow and then LogicalRDD/RDDScanExec essentially used a UnsafeProjection to write them into an UnsafeRow before returning as an InternalRow.
    - New BatchReader: Wraps objects into GenericRow and then DataSourceV2Relation/DataSourceV2ScanExec/DatasourceRDD essentially used a RowEncoder to write them into an UnsafeRow before returning as an InternalRow.
    
    I am running tests to measure the throughput of the two sources. Since we are not removing the V1 source we can always fallback to V1, and fix V2 later.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/906/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87497/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167123837
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    +
    +  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
    +
    +  /**
    +   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
    +   * the checkpoint.
    +   */
    +  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
    +    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
    +    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
    +    // (KAFKA-1894).
    +    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
    +
    +    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
    +    assert(SparkSession.getActiveSession.nonEmpty)
    +
    +    val metadataLog =
    +      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
    +    metadataLog.get(0).getOrElse {
    +      val offsets = startingOffsets match {
    +        case EarliestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
    +        case LatestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
    +        case SpecificOffsetRangeLimit(p) =>
    +          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
    +      }
    +      metadataLog.add(0, offsets)
    +      logInfo(s"Initial offsets: $offsets")
    +      offsets
    +    }.partitionToOffsets
    +  }
    +
    +  /** Proportionally distribute limit number of offsets among topicpartitions */
    +  private def rateLimit(
    --- End diff --
    
    Same as KafkaSource.rateLimit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87238 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87238/testReport)** for PR 20554 at commit [`3ed2a50`](https://github.com/apache/spark/commit/3ed2a509276194214875f39e1e18d8093155c54c).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87238/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/732/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167815276
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    +
    +  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
    +
    +  /**
    +   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
    +   * the checkpoint.
    +   */
    +  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
    +    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
    +    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
    +    // (KAFKA-1894).
    +    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
    +
    +    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
    +    assert(SparkSession.getActiveSession.nonEmpty)
    +
    +    val metadataLog =
    +      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
    +    metadataLog.get(0).getOrElse {
    +      val offsets = startingOffsets match {
    +        case EarliestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
    +        case LatestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
    +        case SpecificOffsetRangeLimit(p) =>
    +          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
    +      }
    +      metadataLog.add(0, offsets)
    +      logInfo(s"Initial offsets: $offsets")
    +      offsets
    +    }.partitionToOffsets
    +  }
    +
    +  /** Proportionally distribute limit number of offsets among topicpartitions */
    +  private def rateLimit(
    +      limit: Long,
    +      from: PartitionOffsetMap,
    +      until: PartitionOffsetMap): PartitionOffsetMap = {
    +    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
    +    val sizes = until.flatMap {
    +      case (tp, end) =>
    +        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
    +        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
    +          val size = end - begin
    +          logDebug(s"rateLimit $tp size is $size")
    +          if (size > 0) Some(tp -> size) else None
    +        }
    +    }
    +    val total = sizes.values.sum.toDouble
    +    if (total < 1) {
    +      until
    +    } else {
    +      until.map {
    +        case (tp, end) =>
    +          tp -> sizes.get(tp).map { size =>
    +            val begin = from.get(tp).getOrElse(fromNew(tp))
    +            val prorate = limit * (size / total)
    +            // Don't completely starve small topicpartitions
    +            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
    +            // Paranoia, make sure not to return an offset that's past end
    +            Math.min(end, off)
    +          }.getOrElse(end)
    +      }
    +    }
    +  }
    +
    +  private def getSortedExecutorList(): Array[String] = {
    +
    +    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
    +      if (a.host == b.host) {
    +        a.executorId > b.executorId
    +      } else {
    +        a.host > b.host
    +      }
    +    }
    +
    +    val bm = SparkEnv.get.blockManager
    +    bm.master.getPeers(bm.blockManagerId).toArray
    +      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
    +      .sortWith(compare)
    +      .map(_.toString)
    +  }
    +
    +  /**
    +   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
    +   * Otherwise, just log a warning.
    +   */
    +  private def reportDataLoss(message: String): Unit = {
    +    if (failOnDataLoss) {
    +      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
    +    } else {
    +      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
    +    }
    +  }
    +
    +  /** A version of [[HDFSMetadataLog]] specialized for saving  */
    +  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
    +    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
    +
    +    val VERSION = 1
    +
    +    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
    +      out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
    +      val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
    +      writer.write("v" + VERSION + "\n")
    +      writer.write(metadata.json)
    +      writer.flush
    +    }
    +
    +    override def deserialize(in: InputStream): KafkaSourceOffset = {
    +      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
    +      val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
    +      // HDFSMetadataLog guarantees that it never creates a partial file.
    +      assert(content.length != 0)
    +      if (content(0) == 'v') {
    +        val indexOfNewLine = content.indexOf("\n")
    +        if (indexOfNewLine > 0) {
    +          val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
    +          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
    +        } else {
    +          throw new IllegalStateException(
    +            s"Log file was malformed: failed to detect the log file version line.")
    +        }
    +      } else {
    +        // The log was generated by Spark 2.1.0
    +        KafkaSourceOffset(SerializedOffset(content))
    +      }
    +    }
    +  }
    +}
    +
    +/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReaderFactory(
    +    range: KafkaOffsetRange,
    +    preferredLoc: Option[String],
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
    +
    +  override def preferredLocations(): Array[String] = preferredLoc.toArray
    +
    +  override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
    +    range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
    +}
    +
    +/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReader(
    +    offsetRange: KafkaOffsetRange,
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
    +
    +  private val consumer = CachedKafkaConsumer.getOrCreate(
    +    offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
    +  private val rangeToRead = resolveRange(offsetRange)
    +  private val timestamp = new java.sql.Timestamp(0)
    +
    +  private var nextOffset = rangeToRead.fromOffset
    +  private var nextRow: Row = _
    +
    +  override def next(): Boolean = {
    +    if (nextOffset < rangeToRead.untilOffset) {
    +      val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
    +      if (cr != null) {
    +        timestamp.setTime(cr.timestamp)
    +        nextRow = Row(
    --- End diff --
    
    Spark needs to convert Row to InternalRow. Right? Do we have an API to avoid this performance regression?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87446/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/892/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/907/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87446 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87446/testReport)** for PR 20554 at commit [`55c057d`](https://github.com/apache/spark/commit/55c057d1291c927c05192e9b2ce95388fe814c1a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87465/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87444 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87444/testReport)** for PR 20554 at commit [`677f897`](https://github.com/apache/spark/commit/677f89764dd04d6c0088af3bf39d5a049777d079).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168558972
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -112,14 +112,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
             query.nonEmpty,
             "Cannot add data when there is no query for finding the active kafka source")
     
    -      val sources = query.get.logicalPlan.collect {
    -        case StreamingExecutionRelation(source: KafkaSource, _) => source
    -      } ++ (query.get.lastExecution match {
    -        case null => Seq()
    -        case e => e.logical.collect {
    -          case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
    -        }
    -      })
    +      val sources = {
    +        query.get.logicalPlan.collect {
    +          case StreamingExecutionRelation(source: KafkaSource, _) => source
    +          case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source
    +        } ++ (query.get.lastExecution match {
    +          case null => Seq()
    +          case e => e.logical.collect {
    +            case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
    +          }
    +        })
    +      }.distinct
    --- End diff --
    
    Is the distinct for the self join test?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87444/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87433/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87436/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168626487
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---
    @@ -112,14 +112,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
             query.nonEmpty,
             "Cannot add data when there is no query for finding the active kafka source")
     
    -      val sources = query.get.logicalPlan.collect {
    -        case StreamingExecutionRelation(source: KafkaSource, _) => source
    -      } ++ (query.get.lastExecution match {
    -        case null => Seq()
    -        case e => e.logical.collect {
    -          case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
    -        }
    -      })
    +      val sources = {
    +        query.get.logicalPlan.collect {
    +          case StreamingExecutionRelation(source: KafkaSource, _) => source
    +          case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source
    +        } ++ (query.get.lastExecution match {
    +          case null => Seq()
    +          case e => e.logical.collect {
    +            case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
    +          }
    +        })
    +      }.distinct
    --- End diff --
    
    yes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87436 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87436/testReport)** for PR 20554 at commit [`f9983d9`](https://github.com/apache/spark/commit/f9983d937b97b2ba9f020f370b8aefdd353a654b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167124346
  
    --- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin ---
    @@ -0,0 +1,2 @@
    +0v99999
    +{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
    --- End diff --
    
    note: should remove the newline to keep it consistent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87446 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87446/testReport)** for PR 20554 at commit [`55c057d`](https://github.com/apache/spark/commit/55c057d1291c927c05192e9b2ce95388fe814c1a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167124768
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
    @@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
         )
       }
     
    -  testWithUninterruptibleThread(
    -    "deserialization of initial offset with Spark 2.1.0") {
    -    withTempDir { metadataPath =>
    -      val topic = newTopic
    -      testUtils.createTopic(topic, partitions = 3)
    -
    -      val provider = new KafkaSourceProvider
    -      val parameters = Map(
    -        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
    -        "subscribe" -> topic
    -      )
    -      val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
    -        "", parameters)
    -      source.getOffset.get // Write initial offset
    -
    -      // Make sure Spark 2.1.0 will throw an exception when reading the new log
    -      intercept[java.lang.IllegalArgumentException] {
    -        // Simulate how Spark 2.1.0 reads the log
    -        Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
    -          val length = in.read()
    -          val bytes = new Array[Byte](length)
    -          in.read(bytes)
    -          KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
    -        }
    -      }
    -    }
    -  }
    -
    -  testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") {
    +  test("deserialization of initial offset written by Spark 2.1.0") {
         withTempDir { metadataPath =>
    --- End diff --
    
    Changed the two tests below to not use the source/reader directly (too low-level implementation dependent test) to actually run a streaming query using sample initial offset files in the `test/resources`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167123513
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    +
    +  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
    +
    +  /**
    +   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
    +   * the checkpoint.
    +   */
    +  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
    +    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
    +    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
    +    // (KAFKA-1894).
    +    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
    +
    +    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
    +    assert(SparkSession.getActiveSession.nonEmpty)
    +
    +    val metadataLog =
    +      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
    +    metadataLog.get(0).getOrElse {
    +      val offsets = startingOffsets match {
    +        case EarliestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
    +        case LatestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
    +        case SpecificOffsetRangeLimit(p) =>
    +          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
    +      }
    +      metadataLog.add(0, offsets)
    +      logInfo(s"Initial offsets: $offsets")
    +      offsets
    +    }.partitionToOffsets
    +  }
    +
    +  /** Proportionally distribute limit number of offsets among topicpartitions */
    +  private def rateLimit(
    +      limit: Long,
    +      from: PartitionOffsetMap,
    +      until: PartitionOffsetMap): PartitionOffsetMap = {
    +    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
    +    val sizes = until.flatMap {
    +      case (tp, end) =>
    +        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
    +        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
    +          val size = end - begin
    +          logDebug(s"rateLimit $tp size is $size")
    +          if (size > 0) Some(tp -> size) else None
    +        }
    +    }
    +    val total = sizes.values.sum.toDouble
    +    if (total < 1) {
    +      until
    +    } else {
    +      until.map {
    +        case (tp, end) =>
    +          tp -> sizes.get(tp).map { size =>
    +            val begin = from.get(tp).getOrElse(fromNew(tp))
    +            val prorate = limit * (size / total)
    +            // Don't completely starve small topicpartitions
    +            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
    +            // Paranoia, make sure not to return an offset that's past end
    +            Math.min(end, off)
    +          }.getOrElse(end)
    +      }
    +    }
    +  }
    +
    +  private def getSortedExecutorList(): Array[String] = {
    +
    +    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
    +      if (a.host == b.host) {
    +        a.executorId > b.executorId
    +      } else {
    +        a.host > b.host
    +      }
    +    }
    +
    +    val bm = SparkEnv.get.blockManager
    +    bm.master.getPeers(bm.blockManagerId).toArray
    +      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
    +      .sortWith(compare)
    +      .map(_.toString)
    +  }
    +
    +  /**
    +   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
    +   * Otherwise, just log a warning.
    +   */
    +  private def reportDataLoss(message: String): Unit = {
    +    if (failOnDataLoss) {
    +      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
    +    } else {
    +      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
    +    }
    +  }
    +
    +  /** A version of [[HDFSMetadataLog]] specialized for saving  */
    +  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
    +    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
    +
    +    val VERSION = 1
    +
    +    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
    +      out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
    +      val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
    +      writer.write("v" + VERSION + "\n")
    +      writer.write(metadata.json)
    +      writer.flush
    +    }
    +
    +    override def deserialize(in: InputStream): KafkaSourceOffset = {
    +      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
    +      val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
    +      // HDFSMetadataLog guarantees that it never creates a partial file.
    +      assert(content.length != 0)
    +      if (content(0) == 'v') {
    +        val indexOfNewLine = content.indexOf("\n")
    +        if (indexOfNewLine > 0) {
    +          val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
    +          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
    +        } else {
    +          throw new IllegalStateException(
    +            s"Log file was malformed: failed to detect the log file version line.")
    +        }
    +      } else {
    +        // The log was generated by Spark 2.1.0
    +        KafkaSourceOffset(SerializedOffset(content))
    +      }
    +    }
    +  }
    +}
    +
    +/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReaderFactory(
    +    range: KafkaOffsetRange,
    +    preferredLoc: Option[String],
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
    +
    +  override def preferredLocations(): Array[String] = preferredLoc.toArray
    +
    +  override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
    +    range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
    +}
    +
    +/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReader(
    +    offsetRange: KafkaOffsetRange,
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
    +
    +  private val consumer = CachedKafkaConsumer.getOrCreate(
    +    offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
    +  private val rangeToRead = resolveRange(offsetRange)
    +  private val timestamp = new java.sql.Timestamp(0)
    +
    +  private var nextOffset = rangeToRead.fromOffset
    +  private var nextRow: Row = _
    +
    +  override def next(): Boolean = {
    --- End diff --
    
    This code has been copied from the `KafkaSourceRDD`'s iterator code.
    https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala#L148
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/891/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167127585
  
    --- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin ---
    @@ -1 +1 @@
    -2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
    \ No newline at end of file
    +2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
    --- End diff --
    
    Why does this need to be modified? The point of this file IIUC is to ensure that compatibility is maintained with offsets logged in old versions, so I worry something's wrong if we need to update it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87498 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87498/testReport)** for PR 20554 at commit [`2dea08a`](https://github.com/apache/spark/commit/2dea08a4c5f85991e4ad4c7da886c2e0bf456bb8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167124564
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
    @@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
         )
       }
     
    -  testWithUninterruptibleThread(
    --- End diff --
    
    I think this test is superfluous and does not test anything useful. As with the other modified tests, "simulating" an implementation is a BAD test, and in this particular case it is attempting to simulate the 2.1.0 log, which is not necessary any more.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87497 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87497/testReport)** for PR 20554 at commit [`7f5df22`](https://github.com/apache/spark/commit/7f5df222da2e6cf59ed632b1c05165f1035202f3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168559060
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
    @@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
         )
       }
     
    -  testWithUninterruptibleThread(
    --- End diff --
    
    +1


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87436 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87436/testReport)** for PR 20554 at commit [`f9983d9`](https://github.com/apache/spark/commit/f9983d937b97b2ba9f020f370b8aefdd353a654b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168591005
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    +
    +  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
    +
    +  /**
    +   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
    +   * the checkpoint.
    +   */
    +  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
    +    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
    +    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
    +    // (KAFKA-1894).
    +    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
    +
    +    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
    +    assert(SparkSession.getActiveSession.nonEmpty)
    +
    +    val metadataLog =
    +      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
    +    metadataLog.get(0).getOrElse {
    +      val offsets = startingOffsets match {
    +        case EarliestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
    +        case LatestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
    +        case SpecificOffsetRangeLimit(p) =>
    +          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
    +      }
    +      metadataLog.add(0, offsets)
    +      logInfo(s"Initial offsets: $offsets")
    +      offsets
    +    }.partitionToOffsets
    +  }
    +
    +  /** Proportionally distribute limit number of offsets among topicpartitions */
    +  private def rateLimit(
    +      limit: Long,
    +      from: PartitionOffsetMap,
    +      until: PartitionOffsetMap): PartitionOffsetMap = {
    +    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
    +    val sizes = until.flatMap {
    +      case (tp, end) =>
    +        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
    +        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
    +          val size = end - begin
    +          logDebug(s"rateLimit $tp size is $size")
    +          if (size > 0) Some(tp -> size) else None
    +        }
    +    }
    +    val total = sizes.values.sum.toDouble
    +    if (total < 1) {
    +      until
    +    } else {
    +      until.map {
    +        case (tp, end) =>
    +          tp -> sizes.get(tp).map { size =>
    +            val begin = from.get(tp).getOrElse(fromNew(tp))
    +            val prorate = limit * (size / total)
    +            // Don't completely starve small topicpartitions
    +            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
    +            // Paranoia, make sure not to return an offset that's past end
    +            Math.min(end, off)
    +          }.getOrElse(end)
    +      }
    +    }
    +  }
    +
    +  private def getSortedExecutorList(): Array[String] = {
    +
    +    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
    +      if (a.host == b.host) {
    +        a.executorId > b.executorId
    +      } else {
    +        a.host > b.host
    +      }
    +    }
    +
    +    val bm = SparkEnv.get.blockManager
    +    bm.master.getPeers(bm.blockManagerId).toArray
    +      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
    +      .sortWith(compare)
    +      .map(_.toString)
    +  }
    +
    +  /**
    +   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
    +   * Otherwise, just log a warning.
    +   */
    +  private def reportDataLoss(message: String): Unit = {
    +    if (failOnDataLoss) {
    +      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
    +    } else {
    +      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
    +    }
    +  }
    +
    +  /** A version of [[HDFSMetadataLog]] specialized for saving  */
    +  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
    +    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
    +
    +    val VERSION = 1
    +
    +    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
    +      out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
    +      val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
    +      writer.write("v" + VERSION + "\n")
    +      writer.write(metadata.json)
    +      writer.flush
    +    }
    +
    +    override def deserialize(in: InputStream): KafkaSourceOffset = {
    +      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
    +      val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
    +      // HDFSMetadataLog guarantees that it never creates a partial file.
    +      assert(content.length != 0)
    +      if (content(0) == 'v') {
    +        val indexOfNewLine = content.indexOf("\n")
    +        if (indexOfNewLine > 0) {
    +          val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
    +          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
    +        } else {
    +          throw new IllegalStateException(
    +            s"Log file was malformed: failed to detect the log file version line.")
    +        }
    +      } else {
    +        // The log was generated by Spark 2.1.0
    +        KafkaSourceOffset(SerializedOffset(content))
    +      }
    +    }
    +  }
    +}
    +
    +/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReaderFactory(
    +    range: KafkaOffsetRange,
    +    preferredLoc: Option[String],
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
    +
    +  override def preferredLocations(): Array[String] = preferredLoc.toArray
    +
    +  override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
    +    range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
    +}
    +
    +/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReader(
    +    offsetRange: KafkaOffsetRange,
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
    +
    +  private val consumer = CachedKafkaConsumer.getOrCreate(
    +    offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
    +  private val rangeToRead = resolveRange(offsetRange)
    +  private val timestamp = new java.sql.Timestamp(0)
    +
    +  private var nextOffset = rangeToRead.fromOffset
    +  private var nextRow: Row = _
    +
    +  override def next(): Boolean = {
    +    if (nextOffset < rangeToRead.untilOffset) {
    +      val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
    +      if (cr != null) {
    +        timestamp.setTime(cr.timestamp)
    +        nextRow = Row(
    --- End diff --
    
    I don't necessarily insist on changing it in this PR, but the V2 API does support creating UnsafeRow directly. See the row creation logic in KafkaContinuousDataReader.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167123614
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    +
    +  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
    +
    +  /**
    +   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
    +   * the checkpoint.
    +   */
    +  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
    +    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
    +    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
    +    // (KAFKA-1894).
    +    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
    +
    +    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
    +    assert(SparkSession.getActiveSession.nonEmpty)
    +
    +    val metadataLog =
    +      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
    +    metadataLog.get(0).getOrElse {
    +      val offsets = startingOffsets match {
    +        case EarliestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
    +        case LatestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
    +        case SpecificOffsetRangeLimit(p) =>
    +          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
    +      }
    +      metadataLog.add(0, offsets)
    +      logInfo(s"Initial offsets: $offsets")
    +      offsets
    +    }.partitionToOffsets
    +  }
    +
    +  /** Proportionally distribute limit number of offsets among topicpartitions */
    +  private def rateLimit(
    +      limit: Long,
    +      from: PartitionOffsetMap,
    +      until: PartitionOffsetMap): PartitionOffsetMap = {
    +    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
    +    val sizes = until.flatMap {
    +      case (tp, end) =>
    +        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
    +        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
    +          val size = end - begin
    +          logDebug(s"rateLimit $tp size is $size")
    +          if (size > 0) Some(tp -> size) else None
    +        }
    +    }
    +    val total = sizes.values.sum.toDouble
    +    if (total < 1) {
    +      until
    +    } else {
    +      until.map {
    +        case (tp, end) =>
    +          tp -> sizes.get(tp).map { size =>
    +            val begin = from.get(tp).getOrElse(fromNew(tp))
    +            val prorate = limit * (size / total)
    +            // Don't completely starve small topicpartitions
    +            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
    +            // Paranoia, make sure not to return an offset that's past end
    +            Math.min(end, off)
    +          }.getOrElse(end)
    +      }
    +    }
    +  }
    +
    +  private def getSortedExecutorList(): Array[String] = {
    +
    +    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
    +      if (a.host == b.host) {
    +        a.executorId > b.executorId
    +      } else {
    +        a.host > b.host
    +      }
    +    }
    +
    +    val bm = SparkEnv.get.blockManager
    +    bm.master.getPeers(bm.blockManagerId).toArray
    +      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
    +      .sortWith(compare)
    +      .map(_.toString)
    +  }
    +
    +  /**
    +   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
    +   * Otherwise, just log a warning.
    +   */
    +  private def reportDataLoss(message: String): Unit = {
    +    if (failOnDataLoss) {
    +      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
    +    } else {
    +      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
    +    }
    +  }
    +
    +  /** A version of [[HDFSMetadataLog]] specialized for saving  */
    +  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
    +    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
    +
    +    val VERSION = 1
    +
    +    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
    +      out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
    +      val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
    +      writer.write("v" + VERSION + "\n")
    +      writer.write(metadata.json)
    +      writer.flush
    +    }
    +
    +    override def deserialize(in: InputStream): KafkaSourceOffset = {
    +      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
    +      val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
    +      // HDFSMetadataLog guarantees that it never creates a partial file.
    +      assert(content.length != 0)
    +      if (content(0) == 'v') {
    +        val indexOfNewLine = content.indexOf("\n")
    +        if (indexOfNewLine > 0) {
    +          val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
    +          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
    +        } else {
    +          throw new IllegalStateException(
    +            s"Log file was malformed: failed to detect the log file version line.")
    +        }
    +      } else {
    +        // The log was generated by Spark 2.1.0
    +        KafkaSourceOffset(SerializedOffset(content))
    +      }
    +    }
    +  }
    +}
    +
    +/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReaderFactory(
    +    range: KafkaOffsetRange,
    +    preferredLoc: Option[String],
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
    +
    +  override def preferredLocations(): Array[String] = preferredLoc.toArray
    +
    +  override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
    +    range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
    +}
    +
    +/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReader(
    +    offsetRange: KafkaOffsetRange,
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
    +
    +  private val consumer = CachedKafkaConsumer.getOrCreate(
    +    offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
    +  private val rangeToRead = resolveRange(offsetRange)
    +  private val timestamp = new java.sql.Timestamp(0)
    +
    +  private var nextOffset = rangeToRead.fromOffset
    +  private var nextRow: Row = _
    +
    +  override def next(): Boolean = {
    +    if (nextOffset < rangeToRead.untilOffset) {
    +      val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
    +      if (cr != null) {
    +        timestamp.setTime(cr.timestamp)
    +        nextRow = Row(
    +          cr.key,
    +          cr.value,
    +          cr.topic,
    +          cr.partition,
    +          cr.offset,
    +          timestamp,
    +          cr.timestampType.id)
    +        true
    +      } else {
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  override def get(): Row = {
    +    assert(nextRow != null)
    +    nextOffset += 1
    +    nextRow
    +  }
    +
    +  override def close(): Unit = {
    +    // Indicate that we're no longer using this consumer
    +    CachedKafkaConsumer.releaseKafkaConsumer(
    +      offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
    +  }
    +
    +  private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
    +    if (range.fromOffset < 0 || range.untilOffset < 0) {
    +      // Late bind the offset range
    +      val availableOffsetRange = consumer.getAvailableOffsetRange()
    +      val fromOffset = if (range.fromOffset < 0) {
    +        assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
    +          s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}")
    +        availableOffsetRange.earliest
    +      } else {
    +        range.fromOffset
    +      }
    +      val untilOffset = if (range.untilOffset < 0) {
    +        assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
    +          s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}")
    +        availableOffsetRange.latest
    +      } else {
    +        range.untilOffset
    +      }
    +      KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset)
    +    } else {
    +      range
    +    }
    +  }
    +}
    +
    +private[kafka010] case class KafkaOffsetRange(
    +  topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long)
    +
    +
    --- End diff --
    
    nit: will remove this extra line.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167123917
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    +
    +  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
    +
    +  /**
    +   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
    +   * the checkpoint.
    +   */
    +  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
    --- End diff --
    
    Moved the code from `KafkaSource.initialPartitionOffsets` into separate function for cleanliness



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87239 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87239/testReport)** for PR 20554 at commit [`05c9d20`](https://github.com/apache/spark/commit/05c9d20da4361d631d8839bd4a45e4966964afa0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87497 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87497/testReport)** for PR 20554 at commit [`7f5df22`](https://github.com/apache/spark/commit/7f5df222da2e6cf59ed632b1c05165f1035202f3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168625742
  
    --- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin ---
    @@ -1 +1 @@
    -2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
    \ No newline at end of file
    +2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
    --- End diff --
    
    I modified the to make the test "deserialization of initial offset written by Spark 2.1.0 " stronger. See the updated test. The way it goes now is that we start the query from earliest offset, and simultaneous have this initial offsets that are NOT at 0 offset. And we check that the query is reading the first offset as given in the initial offset and not the earliest available in the topic. Hence I am changing the file a little bit, the values not the format.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87238 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87238/testReport)** for PR 20554 at commit [`3ed2a50`](https://github.com/apache/spark/commit/3ed2a509276194214875f39e1e18d8093155c54c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168366098
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
    @@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
         )
       }
     
    -  testWithUninterruptibleThread(
    --- End diff --
    
    Added 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167124308
  
    --- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin ---
    @@ -1 +1 @@
    -2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
    \ No newline at end of file
    +2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
    --- End diff --
    
    note: should remove the newline to keep it consistent.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167123199
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---
    @@ -408,8 +401,27 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
       private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
       private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
    +
       val TOPIC_OPTION_KEY = "topic"
     
    +  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
    --- End diff --
    
    Moved this from KafkaSource to this class because this is used by multiple reader classes and therefore should be present in the higher level class (e.g. the provider class).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87466 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87466/testReport)** for PR 20554 at commit [`ba4724b`](https://github.com/apache/spark/commit/ba4724bfbae88266c2b565c709e1f4ff4794ef57).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/881/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168558562
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---
    @@ -306,7 +307,7 @@ private[kafka010] class KafkaSource(
         kafkaReader.close()
       }
     
    -  override def toString(): String = s"KafkaSource[$kafkaReader]"
    +  override def toString(): String = s"KafkaSourceV1[$kafkaReader]"
    --- End diff --
    
    good catch


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r168625863
  
    --- Diff: external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin ---
    @@ -0,0 +1,2 @@
    +0v99999
    +{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
    --- End diff --
    
    done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87444 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87444/testReport)** for PR 20554 at commit [`677f897`](https://github.com/apache/spark/commit/677f89764dd04d6c0088af3bf39d5a049777d079).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    @jose-torres  @zsxwing please take a look.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167807584
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    --- End diff --
    
    This method should close the reader.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/884/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20554


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87465 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87465/testReport)** for PR 20554 at commit [`66f3ef2`](https://github.com/apache/spark/commit/66f3ef2d70bf9e7468bd28ee4a4f34c35726fd98).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167123713
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    +
    +  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
    +
    +  /**
    +   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
    +   * the checkpoint.
    +   */
    +  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
    +    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
    +    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
    +    // (KAFKA-1894).
    +    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
    +
    +    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
    +    assert(SparkSession.getActiveSession.nonEmpty)
    +
    +    val metadataLog =
    +      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
    +    metadataLog.get(0).getOrElse {
    +      val offsets = startingOffsets match {
    +        case EarliestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
    +        case LatestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
    +        case SpecificOffsetRangeLimit(p) =>
    +          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
    +      }
    +      metadataLog.add(0, offsets)
    +      logInfo(s"Initial offsets: $offsets")
    +      offsets
    +    }.partitionToOffsets
    +  }
    +
    +  /** Proportionally distribute limit number of offsets among topicpartitions */
    +  private def rateLimit(
    +      limit: Long,
    +      from: PartitionOffsetMap,
    +      until: PartitionOffsetMap): PartitionOffsetMap = {
    +    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
    +    val sizes = until.flatMap {
    +      case (tp, end) =>
    +        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
    +        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
    +          val size = end - begin
    +          logDebug(s"rateLimit $tp size is $size")
    +          if (size > 0) Some(tp -> size) else None
    +        }
    +    }
    +    val total = sizes.values.sum.toDouble
    +    if (total < 1) {
    +      until
    +    } else {
    +      until.map {
    +        case (tp, end) =>
    +          tp -> sizes.get(tp).map { size =>
    +            val begin = from.get(tp).getOrElse(fromNew(tp))
    +            val prorate = limit * (size / total)
    +            // Don't completely starve small topicpartitions
    +            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
    +            // Paranoia, make sure not to return an offset that's past end
    +            Math.min(end, off)
    +          }.getOrElse(end)
    +      }
    +    }
    +  }
    +
    +  private def getSortedExecutorList(): Array[String] = {
    --- End diff --
    
    Moved from the `object KafkaSource` to inside this `KafkaBatchReadre` class to remove the object completely.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    @zsxwing can you look at it once again. some more changes.
    @jose-torres PTAL.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87466/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167809278
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---
    @@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
         )
       }
     
    -  testWithUninterruptibleThread(
    --- End diff --
    
    Agreed. How about just writing a test to make sure we do write 0 at the beginning?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87239 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87239/testReport)** for PR 20554 at commit [`05c9d20`](https://github.com/apache/spark/commit/05c9d20da4361d631d8839bd4a45e4966964afa0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167811474
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---
    @@ -28,50 +28,40 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
     
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
    -import org.apache.spark.sql.execution.streaming.{Sink, Source}
    +import org.apache.spark.sql.execution.streaming.Sink
     import org.apache.spark.sql.sources._
    -import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
     import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
     import org.apache.spark.sql.streaming.OutputMode
     import org.apache.spark.sql.types.StructType
     
     /**
    - * The provider class for the [[KafkaSource]]. This provider is designed such that it throws
    + * The provider class for all Kafka readers and writers. It is designed such that it throws
      * IllegalArgumentException when the Kafka Dataset is created, so that it can catch
      * missing options even before the query is started.
      */
     private[kafka010] class KafkaSourceProvider extends DataSourceRegister
    -    with StreamSourceProvider
         with StreamSinkProvider
         with RelationProvider
         with CreatableRelationProvider
         with StreamWriteSupport
         with ContinuousReadSupport
    +    with MicroBatchReadSupport
         with Logging {
       import KafkaSourceProvider._
     
       override def shortName(): String = "kafka"
     
       /**
    -   * Returns the name and schema of the source. In addition, it also verifies whether the options
    -   * are correct and sufficient to create the [[KafkaSource]] when the query is started.
    +   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches
    +   * of Kafka data in a micro-batch streaming query.
        */
    -  override def sourceSchema(
    -      sqlContext: SQLContext,
    -      schema: Option[StructType],
    -      providerName: String,
    -      parameters: Map[String, String]): (String, StructType) = {
    -    validateStreamOptions(parameters)
    -    require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one")
    -    (shortName(), KafkaOffsetReader.kafkaSchema)
    -  }
    -
    -  override def createSource(
    -      sqlContext: SQLContext,
    +  def createMicroBatchReader(
    --- End diff --
    
    nit: `override`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/893/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/927/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167123580
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
    +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user
    + * must make sure all messages in a topic have been processed when deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got deleted.
    +      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    +
    +  override def toString(): String = s"Kafka[$kafkaOffsetReader]"
    +
    +  /**
    +   * Read initial partition offsets from the checkpoint, or decide the offsets and write them to
    +   * the checkpoint.
    +   */
    +  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
    +    // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread.
    +    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever
    +    // (KAFKA-1894).
    +    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
    +
    +    // SparkSession is required for getting Hadoop configuration for writing to checkpoints
    +    assert(SparkSession.getActiveSession.nonEmpty)
    +
    +    val metadataLog =
    +      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath)
    +    metadataLog.get(0).getOrElse {
    +      val offsets = startingOffsets match {
    +        case EarliestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
    +        case LatestOffsetRangeLimit =>
    +          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
    +        case SpecificOffsetRangeLimit(p) =>
    +          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
    +      }
    +      metadataLog.add(0, offsets)
    +      logInfo(s"Initial offsets: $offsets")
    +      offsets
    +    }.partitionToOffsets
    +  }
    +
    +  /** Proportionally distribute limit number of offsets among topicpartitions */
    +  private def rateLimit(
    +      limit: Long,
    +      from: PartitionOffsetMap,
    +      until: PartitionOffsetMap): PartitionOffsetMap = {
    +    val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
    +    val sizes = until.flatMap {
    +      case (tp, end) =>
    +        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
    +        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
    +          val size = end - begin
    +          logDebug(s"rateLimit $tp size is $size")
    +          if (size > 0) Some(tp -> size) else None
    +        }
    +    }
    +    val total = sizes.values.sum.toDouble
    +    if (total < 1) {
    +      until
    +    } else {
    +      until.map {
    +        case (tp, end) =>
    +          tp -> sizes.get(tp).map { size =>
    +            val begin = from.get(tp).getOrElse(fromNew(tp))
    +            val prorate = limit * (size / total)
    +            // Don't completely starve small topicpartitions
    +            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
    +            // Paranoia, make sure not to return an offset that's past end
    +            Math.min(end, off)
    +          }.getOrElse(end)
    +      }
    +    }
    +  }
    +
    +  private def getSortedExecutorList(): Array[String] = {
    +
    +    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
    +      if (a.host == b.host) {
    +        a.executorId > b.executorId
    +      } else {
    +        a.host > b.host
    +      }
    +    }
    +
    +    val bm = SparkEnv.get.blockManager
    +    bm.master.getPeers(bm.blockManagerId).toArray
    +      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
    +      .sortWith(compare)
    +      .map(_.toString)
    +  }
    +
    +  /**
    +   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
    +   * Otherwise, just log a warning.
    +   */
    +  private def reportDataLoss(message: String): Unit = {
    +    if (failOnDataLoss) {
    +      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
    +    } else {
    +      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
    +    }
    +  }
    +
    +  /** A version of [[HDFSMetadataLog]] specialized for saving  */
    +  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String)
    +    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
    +
    +    val VERSION = 1
    +
    +    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
    +      out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
    +      val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
    +      writer.write("v" + VERSION + "\n")
    +      writer.write(metadata.json)
    +      writer.flush
    +    }
    +
    +    override def deserialize(in: InputStream): KafkaSourceOffset = {
    +      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
    +      val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
    +      // HDFSMetadataLog guarantees that it never creates a partial file.
    +      assert(content.length != 0)
    +      if (content(0) == 'v') {
    +        val indexOfNewLine = content.indexOf("\n")
    +        if (indexOfNewLine > 0) {
    +          val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
    +          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
    +        } else {
    +          throw new IllegalStateException(
    +            s"Log file was malformed: failed to detect the log file version line.")
    +        }
    +      } else {
    +        // The log was generated by Spark 2.1.0
    +        KafkaSourceOffset(SerializedOffset(content))
    +      }
    +    }
    +  }
    +}
    +
    +/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReaderFactory(
    +    range: KafkaOffsetRange,
    +    preferredLoc: Option[String],
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReaderFactory[Row] {
    +
    +  override def preferredLocations(): Array[String] = preferredLoc.toArray
    +
    +  override def createDataReader(): DataReader[Row] = new KafkaMicroBatchDataReader(
    +    range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
    +}
    +
    +/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
    +private[kafka010] class KafkaMicroBatchDataReader(
    +    offsetRange: KafkaOffsetRange,
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean) extends DataReader[Row] with Logging {
    +
    +  private val consumer = CachedKafkaConsumer.getOrCreate(
    +    offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
    +  private val rangeToRead = resolveRange(offsetRange)
    +  private val timestamp = new java.sql.Timestamp(0)
    +
    +  private var nextOffset = rangeToRead.fromOffset
    +  private var nextRow: Row = _
    +
    +  override def next(): Boolean = {
    +    if (nextOffset < rangeToRead.untilOffset) {
    +      val cr = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
    +      if (cr != null) {
    +        timestamp.setTime(cr.timestamp)
    +        nextRow = Row(
    +          cr.key,
    +          cr.value,
    +          cr.topic,
    +          cr.partition,
    +          cr.offset,
    +          timestamp,
    +          cr.timestampType.id)
    +        true
    +      } else {
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  override def get(): Row = {
    +    assert(nextRow != null)
    +    nextOffset += 1
    +    nextRow
    +  }
    +
    +  override def close(): Unit = {
    +    // Indicate that we're no longer using this consumer
    +    CachedKafkaConsumer.releaseKafkaConsumer(
    +      offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
    +  }
    +
    +  private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
    --- End diff --
    
    Same method as this KafkaSourceRDD method.
    https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala#L187


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/928/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87445 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87445/testReport)** for PR 20554 at commit [`5cd5a4c`](https://github.com/apache/spark/commit/5cd5a4cad7c8ccca1a5bfa7b11fa04f939a15015).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87239/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87433 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87433/testReport)** for PR 20554 at commit [`f9983d9`](https://github.com/apache/spark/commit/f9983d937b97b2ba9f020f370b8aefdd353a654b).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87498/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87445 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87445/testReport)** for PR 20554 at commit [`5cd5a4c`](https://github.com/apache/spark/commit/5cd5a4cad7c8ccca1a5bfa7b11fa04f939a15015).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87466 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87466/testReport)** for PR 20554 at commit [`ba4724b`](https://github.com/apache/spark/commit/ba4724bfbae88266c2b565c709e1f4ff4794ef57).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    LGTM again


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87445/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87498 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87498/testReport)** for PR 20554 at commit [`2dea08a`](https://github.com/apache/spark/commit/2dea08a4c5f85991e4ad4c7da886c2e0bf456bb8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/733/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87465 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87465/testReport)** for PR 20554 at commit [`66f3ef2`](https://github.com/apache/spark/commit/66f3ef2d70bf9e7468bd28ee4a4f34c35726fd98).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source to v2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20554
  
    **[Test build #87433 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87433/testReport)** for PR 20554 at commit [`f9983d9`](https://github.com/apache/spark/commit/f9983d937b97b2ba9f020f370b8aefdd353a654b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org