You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/27 17:31:02 UTC

spark git commit: [SPARK-17813][SQL][KAFKA] Maximum data per trigger

Repository: spark
Updated Branches:
  refs/heads/master 701a9d361 -> 104232580


[SPARK-17813][SQL][KAFKA] Maximum data per trigger

## What changes were proposed in this pull request?

maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions.

## How was this patch tested?

Added unit test

Author: cody koeninger <co...@koeninger.org>

Closes #15527 from koeninger/SPARK-17813.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10423258
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10423258
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10423258

Branch: refs/heads/master
Commit: 104232580528c097a284d753adb5795f6de8b0a5
Parents: 701a9d3
Author: cody koeninger <co...@koeninger.org>
Authored: Thu Oct 27 10:30:59 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Oct 27 10:30:59 2016 -0700

----------------------------------------------------------------------
 docs/structured-streaming-kafka-integration.md  |   6 ++
 .../apache/spark/sql/kafka010/KafkaSource.scala | 107 ++++++++++++++-----
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  71 +++++++++++-
 3 files changed, 157 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/10423258/docs/structured-streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index e851f21..a6c3b3a 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -221,6 +221,12 @@ The following configurations are optional:
   <td>10</td>
   <td>milliseconds to wait before retrying to fetch Kafka offsets</td>
 </tr>
+<tr>
+  <td>maxOffsetsPerTrigger</td>
+  <td>long</td>
+  <td>none</td>
+  <td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
+</tr>
 </table>
 
 Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, 

http://git-wip-us.apache.org/repos/asf/spark/blob/10423258/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 537b7b0..61cba73 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -96,6 +96,9 @@ private[kafka010] case class KafkaSource(
   private val offsetFetchAttemptIntervalMs =
     sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
 
+  private val maxOffsetsPerTrigger =
+    sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
   /**
    * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
    * offsets and never commits them.
@@ -121,6 +124,8 @@ private[kafka010] case class KafkaSource(
     }.partitionToOffsets
   }
 
+  private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None
+
   override def schema: StructType = KafkaSource.kafkaSchema
 
   /** Returns the maximum available offset for this source. */
@@ -128,9 +133,54 @@ private[kafka010] case class KafkaSource(
     // Make sure initialPartitionOffsets is initialized
     initialPartitionOffsets
 
-    val offset = KafkaSourceOffset(fetchLatestOffsets())
-    logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
-    Some(offset)
+    val latest = fetchLatestOffsets()
+    val offsets = maxOffsetsPerTrigger match {
+      case None =>
+        latest
+      case Some(limit) if currentPartitionOffsets.isEmpty =>
+        rateLimit(limit, initialPartitionOffsets, latest)
+      case Some(limit) =>
+        rateLimit(limit, currentPartitionOffsets.get, latest)
+    }
+
+    currentPartitionOffsets = Some(offsets)
+    logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
+    Some(KafkaSourceOffset(offsets))
+  }
+
+  /** Proportionally distribute limit number of offsets among topicpartitions */
+  private def rateLimit(
+      limit: Long,
+      from: Map[TopicPartition, Long],
+      until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+    val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+    val sizes = until.flatMap {
+      case (tp, end) =>
+        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
+        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+          val size = end - begin
+          logDebug(s"rateLimit $tp size is $size")
+          if (size > 0) Some(tp -> size) else None
+        }
+    }
+    val total = sizes.values.sum.toDouble
+    if (total < 1) {
+      until
+    } else {
+      until.map {
+        case (tp, end) =>
+          tp -> sizes.get(tp).map { size =>
+            val begin = from.get(tp).getOrElse(fromNew(tp))
+            val prorate = limit * (size / total)
+            logDebug(s"rateLimit $tp prorated amount is $prorate")
+            // Don't completely starve small topicpartitions
+            val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
+            logDebug(s"rateLimit $tp new offset is $off")
+            // Paranoia, make sure not to return an offset that's past end
+            Math.min(end, off)
+          }.getOrElse(end)
+      }
+    }
   }
 
   /**
@@ -153,11 +203,7 @@ private[kafka010] case class KafkaSource(
 
     // Find the new partitions, and get their earliest offsets
     val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
-    val newPartitionOffsets = if (newPartitions.nonEmpty) {
-      fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
-    } else {
-      Map.empty[TopicPartition, Long]
-    }
+    val newPartitionOffsets = fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
     if (newPartitionOffsets.keySet != newPartitions) {
       // We cannot get from offsets for some partitions. It means they got deleted.
       val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
@@ -221,6 +267,12 @@ private[kafka010] case class KafkaSource(
 
     logInfo("GetBatch generating RDD of offset range: " +
       offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
+
+    // On recovery, getBatch will get called before getOffset
+    if (currentPartitionOffsets.isEmpty) {
+      currentPartitionOffsets = Some(untilPartitionOffsets)
+    }
+
     sqlContext.createDataFrame(rdd, schema)
   }
 
@@ -305,23 +357,28 @@ private[kafka010] case class KafkaSource(
    * some partitions if they are deleted.
    */
   private def fetchNewPartitionEarliestOffsets(
-      newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
-    // Poll to get the latest assigned partitions
-    consumer.poll(0)
-    val partitions = consumer.assignment()
-    consumer.pause(partitions)
-    logDebug(s"\tPartitions assigned to consumer: $partitions")
-
-    // Get the earliest offset of each partition
-    consumer.seekToBeginning(partitions)
-    val partitionOffsets = newPartitions.filter { p =>
-      // When deleting topics happen at the same time, some partitions may not be in `partitions`.
-      // So we need to ignore them
-      partitions.contains(p)
-    }.map(p => p -> consumer.position(p)).toMap
-    logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
-    partitionOffsets
-  }
+      newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] =
+    if (newPartitions.isEmpty) {
+      Map.empty[TopicPartition, Long]
+    } else {
+      withRetriesWithoutInterrupt {
+        // Poll to get the latest assigned partitions
+        consumer.poll(0)
+        val partitions = consumer.assignment()
+        consumer.pause(partitions)
+        logDebug(s"\tPartitions assigned to consumer: $partitions")
+
+        // Get the earliest offset of each partition
+        consumer.seekToBeginning(partitions)
+        val partitionOffsets = newPartitions.filter { p =>
+          // When deleting topics happen at the same time, some partitions may not be in
+          // `partitions`. So we need to ignore them
+          partitions.contains(p)
+        }.map(p => p -> consumer.position(p)).toMap
+        logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
+        partitionOffsets
+      }
+    }
 
   /**
    * Helper function that does multiple retries on the a body of code that returns offsets.

http://git-wip-us.apache.org/repos/asf/spark/blob/10423258/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index b50688e..ed4cc75 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -23,13 +23,14 @@ import scala.util.Random
 
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.TopicPartition
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.streaming.{ ProcessingTime, StreamTest }
 import org.apache.spark.sql.test.SharedSQLContext
 
-
 abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
 
   protected var testUtils: KafkaTestUtils = _
@@ -133,6 +134,72 @@ class KafkaSourceSuite extends KafkaSourceTest {
 
   private val topicId = new AtomicInteger(0)
 
+  test("maxOffsetsPerTrigger") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("1"), Some(2))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 10)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    val clock = new StreamManualClock
+
+    val waitUntilBatchProcessed = AssertOnQuery { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+      true
+    }
+
+    testStream(mapped)(
+      StartStream(ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // 1 from smallest, 1 from middle, 8 from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // smallest now empty, 1 more from middle, 9 more from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116
+      ),
+      StopStream,
+      StartStream(ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // smallest now empty, 1 more from middle, 9 more from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 121, 122, 123, 124, 125
+      ),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // smallest now empty, 1 more from middle, 9 more from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
+        13, 126, 127, 128, 129, 130, 131, 132, 133, 134
+      )
+    )
+  }
+
   test("cannot stop Kafka stream") {
     val topic = newTopic()
     testUtils.createTopic(newTopic(), partitions = 5)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org