You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by koeninger <gi...@git.apache.org> on 2018/08/03 04:29:02 UTC
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user koeninger commented on a diff in the pull request:
https://github.com/apache/spark/pull/21917#discussion_r207437645
--- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---
@@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V](
}.getOrElse(offsets)
}
- override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
- val untilOffsets = clamp(latestOffsets())
- val offsetRanges = untilOffsets.map { case (tp, uo) =>
- val fo = currentOffsets(tp)
- OffsetRange(tp.topic, tp.partition, fo, uo)
+ /**
+ * Return the offset range. For non consecutive offset the last offset must have record.
+ * If offsets have missing data (transaction marker or abort), increases the
+ * range until we get the requested number of record or no more records.
+ * Because we have to iterate over all the records in this case,
+ * we also return the total number of records.
+ * @param offsets the target range we would like if offset were continue
+ * @return (totalNumberOfRecords, updated offset)
+ */
+ private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = {
+ if (nonConsecutive) {
+ val localRw = rewinder()
+ val localOffsets = currentOffsets
+ context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => {
+ tpos.map { case (tp, o) =>
+ val offsetAndCount = localRw.getLastOffsetAndCount(localOffsets(tp), tp, o)
+ (tp, offsetAndCount)
+ }
+ }).collect()
--- End diff --
What exactly is the benefit gained by doing a duplicate read of all the messages?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org