You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/21 22:55:07 UTC

spark git commit: [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream

Repository: spark
Updated Branches:
  refs/heads/master 140570252 -> 268ccb9a4


[SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream

## What changes were proposed in this pull request?

startingOffsets takes specific per-topicpartition offsets as a json argument, usable with any consumer strategy

assign with specific topicpartitions as a consumer strategy

## How was this patch tested?

Unit tests

Author: cody koeninger <co...@koeninger.org>

Closes #15504 from koeninger/SPARK-17812.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/268ccb9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/268ccb9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/268ccb9a

Branch: refs/heads/master
Commit: 268ccb9a48dfefc4d7bc85155e7e20a2dfe89307
Parents: 1405702
Author: cody koeninger <co...@koeninger.org>
Authored: Fri Oct 21 15:55:04 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Oct 21 15:55:04 2016 -0700

----------------------------------------------------------------------
 docs/structured-streaming-kafka-integration.md  |  38 +++++--
 .../apache/spark/sql/kafka010/JsonUtils.scala   |  93 +++++++++++++++
 .../apache/spark/sql/kafka010/KafkaSource.scala |  64 +++++++++--
 .../sql/kafka010/KafkaSourceProvider.scala      |  52 ++++-----
 .../spark/sql/kafka010/StartingOffsets.scala    |  32 ++++++
 .../spark/sql/kafka010/JsonUtilsSuite.scala     |  45 ++++++++
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 114 +++++++++++++++++--
 .../spark/sql/kafka010/KafkaTestUtils.scala     |  14 ++-
 8 files changed, 395 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/docs/structured-streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 668489a..e851f21 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -151,15 +151,24 @@ The following options must be set for the Kafka source.
 <table class="table">
 <tr><th>Option</th><th>value</th><th>meaning</th></tr>
 <tr>
+  <td>assign</td>
+  <td>json string {"topicA":[0,1],"topicB":[2,4]}</td>
+  <td>Specific TopicPartitions to consume.
+  Only one of "assign", "subscribe" or "subscribePattern"
+  options can be specified for Kafka source.</td>
+</tr>
+<tr>
   <td>subscribe</td>
   <td>A comma-separated list of topics</td>
-  <td>The topic list to subscribe. Only one of "subscribe" and "subscribePattern" options can be
-  specified for Kafka source.</td>
+  <td>The topic list to subscribe.
+  Only one of "assign", "subscribe" or "subscribePattern"
+  options can be specified for Kafka source.</td>
 </tr>
 <tr>
   <td>subscribePattern</td>
   <td>Java regex string</td>
-  <td>The pattern used to subscribe the topic. Only one of "subscribe" and "subscribePattern"
+  <td>The pattern used to subscribe to topic(s).
+  Only one of "assign, "subscribe" or "subscribePattern"
   options can be specified for Kafka source.</td>
 </tr>
 <tr>
@@ -174,16 +183,21 @@ The following configurations are optional:
 <table class="table">
 <tr><th>Option</th><th>value</th><th>default</th><th>meaning</th></tr>
 <tr>
-  <td>startingOffset</td>
-  <td>["earliest", "latest"]</td>
-  <td>"latest"</td>
-  <td>The start point when a query is started, either "earliest" which is from the earliest offset, 
-  or "latest" which is just from the latest offset. Note: This only applies when a new Streaming q
-  uery is started, and that resuming will always pick up from where the query left off.</td>
+  <td>startingOffsets</td>
+  <td>earliest, latest, or json string
+  {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}
+  </td>
+  <td>latest</td>
+  <td>The start point when a query is started, either "earliest" which is from the earliest offsets,
+  "latest" which is just from the latest offsets, or a json string specifying a starting offset for
+  each TopicPartition.  In the json, -2 as an offset can be used to refer to earliest, -1 to latest.
+  Note: This only applies when a new Streaming query is started, and that resuming will always pick
+  up from where the query left off. Newly discovered partitions during a query will start at
+  earliest.</td>
 </tr>
 <tr>
   <td>failOnDataLoss</td>
-  <td>[true, false]</td>
+  <td>true or false</td>
   <td>true</td>
   <td>Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or 
   offsets are out of range). This may be a false alarm. You can disable it when it doesn't work
@@ -215,10 +229,10 @@ Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.
 
 Note that the following Kafka params cannot be set and the Kafka source will throw an exception:
 - **group.id**: Kafka source will create a unique group id for each query automatically.
-- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` or `latest` to specify
+- **auto.offset.reset**: Set the source option `startingOffsets` to specify
  where to start instead. Structured Streaming manages which offsets are consumed internally, rather 
  than rely on the kafka Consumer to do it. This will ensure that no data is missed when when new 
- topics/partitions are dynamically subscribed. Note that `startingOffset` only applies when a new
+ topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new
  Streaming query is started, and that resuming will always pick up from where the query left off.
 - **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use 
  DataFrame operations to explicitly deserialize the keys.

http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
new file mode 100644
index 0000000..40d568a
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io.Writer
+
+import scala.collection.mutable.HashMap
+import scala.util.control.NonFatal
+
+import org.apache.kafka.common.TopicPartition
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Utilities for converting Kafka related objects to and from json.
+ */
+private object JsonUtils {
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  /**
+   * Read TopicPartitions from json string
+   */
+  def partitions(str: String): Array[TopicPartition] = {
+    try {
+      Serialization.read[Map[String, Seq[Int]]](str).flatMap {  case (topic, parts) =>
+          parts.map { part =>
+            new TopicPartition(topic, part)
+          }
+      }.toArray
+    } catch {
+      case NonFatal(x) =>
+        throw new IllegalArgumentException(
+          s"""Expected e.g. {"topicA":[0,1],"topicB":[0,1]}, got $str""")
+    }
+  }
+
+  /**
+   * Write TopicPartitions as json string
+   */
+  def partitions(partitions: Iterable[TopicPartition]): String = {
+    val result = new HashMap[String, List[Int]]
+    partitions.foreach { tp =>
+      val parts: List[Int] = result.getOrElse(tp.topic, Nil)
+      result += tp.topic -> (tp.partition::parts)
+    }
+    Serialization.write(result)
+  }
+
+  /**
+   * Read per-TopicPartition offsets from json string
+   */
+  def partitionOffsets(str: String): Map[TopicPartition, Long] = {
+    try {
+      Serialization.read[Map[String, Map[Int, Long]]](str).flatMap { case (topic, partOffsets) =>
+          partOffsets.map { case (part, offset) =>
+              new TopicPartition(topic, part) -> offset
+          }
+      }.toMap
+    } catch {
+      case NonFatal(x) =>
+        throw new IllegalArgumentException(
+          s"""Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got $str""")
+    }
+  }
+
+  /**
+   * Write per-TopicPartition offsets as json string
+   */
+  def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
+    val result = new HashMap[String, HashMap[Int, Long]]()
+    partitionOffsets.foreach { case (tp, off) =>
+        val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
+        parts += tp.partition -> off
+        result += tp.topic -> parts
+    }
+    Serialization.write(result)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 4b0bb0a..537b7b0 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -22,7 +22,7 @@ import java.{util => ju}
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
@@ -82,7 +82,7 @@ private[kafka010] case class KafkaSource(
     executorKafkaParams: ju.Map[String, Object],
     sourceOptions: Map[String, String],
     metadataPath: String,
-    startFromEarliestOffset: Boolean,
+    startingOffsets: StartingOffsets,
     failOnDataLoss: Boolean)
   extends Source with Logging {
 
@@ -110,10 +110,10 @@ private[kafka010] case class KafkaSource(
   private lazy val initialPartitionOffsets = {
     val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
     metadataLog.get(0).getOrElse {
-      val offsets = if (startFromEarliestOffset) {
-        KafkaSourceOffset(fetchEarliestOffsets())
-      } else {
-        KafkaSourceOffset(fetchLatestOffsets())
+      val offsets = startingOffsets match {
+        case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets())
+        case LatestOffsets => KafkaSourceOffset(fetchLatestOffsets())
+        case SpecificOffsets(p) => KafkaSourceOffset(fetchSpecificStartingOffsets(p))
       }
       metadataLog.add(0, offsets)
       logInfo(s"Initial offsets: $offsets")
@@ -232,6 +232,43 @@ private[kafka010] case class KafkaSource(
   override def toString(): String = s"KafkaSource[$consumerStrategy]"
 
   /**
+   * Set consumer position to specified offsets, making sure all assignments are set.
+   */
+  private def fetchSpecificStartingOffsets(
+      partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+    val result = withRetriesWithoutInterrupt {
+      // Poll to get the latest assigned partitions
+      consumer.poll(0)
+      val partitions = consumer.assignment()
+      consumer.pause(partitions)
+      assert(partitions.asScala == partitionOffsets.keySet,
+        "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
+          "Use -1 for latest, -2 for earliest, if you don't care.\n" +
+          s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
+      logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")
+
+      partitionOffsets.foreach {
+        case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp))
+        case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp))
+        case (tp, off) => consumer.seek(tp, off)
+      }
+      partitionOffsets.map {
+        case (tp, _) => tp -> consumer.position(tp)
+      }
+    }
+    partitionOffsets.foreach {
+      case (tp, off) if off != -1 && off != -2 =>
+        if (result(tp) != off) {
+          reportDataLoss(
+            s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}")
+        }
+      case _ =>
+        // no real way to check that beginning or end is reasonable
+    }
+    result
+  }
+
+  /**
    * Fetch the earliest offsets of partitions.
    */
   private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
@@ -273,7 +310,7 @@ private[kafka010] case class KafkaSource(
     consumer.poll(0)
     val partitions = consumer.assignment()
     consumer.pause(partitions)
-    logDebug(s"\tPartitioned assigned to consumer: $partitions")
+    logDebug(s"\tPartitions assigned to consumer: $partitions")
 
     // Get the earliest offset of each partition
     consumer.seekToBeginning(partitions)
@@ -317,6 +354,8 @@ private[kafka010] case class KafkaSource(
               try {
                 result = Some(body)
               } catch {
+                case x: OffsetOutOfRangeException =>
+                  reportDataLoss(x.getMessage)
                 case NonFatal(e) =>
                   lastException = e
                   logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
@@ -373,6 +412,17 @@ private[kafka010] object KafkaSource {
     def createConsumer(): Consumer[Array[Byte], Array[Byte]]
   }
 
+  case class AssignStrategy(partitions: Array[TopicPartition], kafkaParams: ju.Map[String, Object])
+    extends ConsumerStrategy {
+    override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
+      val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+      consumer.assign(ju.Arrays.asList(partitions: _*))
+      consumer
+    }
+
+    override def toString: String = s"Assign[${partitions.mkString(", ")}]"
+  }
+
   case class SubscribeStrategy(topics: Seq[String], kafkaParams: ju.Map[String, Object])
     extends ConsumerStrategy {
     override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 23b1b60..585ced8 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -77,14 +77,12 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
     // id. Hence, we should generate a unique id for each query.
     val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
 
-    val startFromEarliestOffset =
-      caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match {
-        case Some("latest") => false
-        case Some("earliest") => true
-        case Some(pos) =>
-          // This should not happen since we have already checked the options.
-          throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos")
-        case None => false
+    val startingOffsets =
+      caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
+        case Some("latest") => LatestOffsets
+        case Some("earliest") => EarliestOffsets
+        case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
+        case None => LatestOffsets
       }
 
     val kafkaParamsForStrategy =
@@ -95,9 +93,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
         // So that consumers in Kafka source do not mess with any existing group id
         .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")
 
-        // Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets
-        // by itself instead of counting on KafkaConsumer.
-        .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+        // Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial
+        // offsets by itself instead of counting on KafkaConsumer.
+        .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
 
         // So that consumers in the driver does not commit offsets unnecessarily
         .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -130,6 +128,10 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
         .build()
 
     val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
+      case ("assign", value) =>
+        AssignStrategy(
+          JsonUtils.partitions(value),
+          kafkaParamsForStrategy)
       case ("subscribe", value) =>
         SubscribeStrategy(
           value.split(",").map(_.trim()).filter(_.nonEmpty),
@@ -153,7 +155,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
       kafkaParamsForExecutors,
       parameters,
       metadataPath,
-      startFromEarliestOffset,
+      startingOffsets,
       failOnDataLoss)
   }
 
@@ -175,6 +177,13 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
     }
 
     val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
+      case ("assign", value) =>
+        if (!value.trim.startsWith("{")) {
+          throw new IllegalArgumentException(
+            "No topicpartitions to assign as specified value for option " +
+              s"'assign' is '$value'")
+        }
+
       case ("subscribe", value) =>
         val topics = value.split(",").map(_.trim).filter(_.nonEmpty)
         if (topics.isEmpty) {
@@ -195,14 +204,6 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
         throw new IllegalArgumentException("Unknown option")
     }
 
-    caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
-      case Some(pos) if !STARTING_OFFSET_OPTION_VALUES.contains(pos.trim.toLowerCase) =>
-        throw new IllegalArgumentException(
-          s"Illegal value '$pos' for option '$STARTING_OFFSET_OPTION_KEY', " +
-            s"acceptable values are: ${STARTING_OFFSET_OPTION_VALUES.mkString(", ")}")
-      case _ =>
-    }
-
     // Validate user-specified Kafka options
 
     if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
@@ -215,11 +216,11 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
       throw new IllegalArgumentException(
         s"""
            |Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported.
-           |Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to
-           |specify where to start. Structured Streaming manages which offsets are consumed
+           |Instead set the source option '$STARTING_OFFSETS_OPTION_KEY' to 'earliest' or 'latest'
+           |to specify where to start. Structured Streaming manages which offsets are consumed
            |internally, rather than relying on the kafkaConsumer to do it. This will ensure that no
            |data is missed when when new topics/partitions are dynamically subscribed. Note that
-           |'$STARTING_OFFSET_OPTION_KEY' only applies when a new Streaming query is started, and
+           |'$STARTING_OFFSETS_OPTION_KEY' only applies when a new Streaming query is started, and
            |that resuming will always pick up from where the query left off. See the docs for more
            |details.
          """.stripMargin)
@@ -282,8 +283,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
 }
 
 private[kafka010] object KafkaSourceProvider {
-  private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern")
-  private val STARTING_OFFSET_OPTION_KEY = "startingoffset"
-  private val STARTING_OFFSET_OPTION_VALUES = Set("earliest", "latest")
+  private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign")
+  private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala
new file mode 100644
index 0000000..83959e5
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/StartingOffsets.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+/*
+ * Values that can be specified for config startingOffsets
+ */
+private[kafka010] sealed trait StartingOffsets
+
+private[kafka010] case object EarliestOffsets extends StartingOffsets
+
+private[kafka010] case object LatestOffsets extends StartingOffsets
+
+private[kafka010] case class SpecificOffsets(
+  partitionOffsets: Map[TopicPartition, Long]) extends StartingOffsets

http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala
new file mode 100644
index 0000000..54b9800
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/JsonUtilsSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkFunSuite
+
+class JsonUtilsSuite extends SparkFunSuite {
+
+  test("parsing partitions") {
+    val parsed = JsonUtils.partitions("""{"topicA":[0,1],"topicB":[4,6]}""")
+    val expected = Array(
+      new TopicPartition("topicA", 0),
+      new TopicPartition("topicA", 1),
+      new TopicPartition("topicB", 4),
+      new TopicPartition("topicB", 6)
+    )
+    assert(parsed.toSeq === expected.toSeq)
+  }
+
+  test("parsing partitionOffsets") {
+    val parsed = JsonUtils.partitionOffsets(
+      """{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}""")
+
+    assert(parsed(new TopicPartition("topicA", 0)) === 23)
+    assert(parsed(new TopicPartition("topicA", 1)) === -1)
+    assert(parsed(new TopicPartition("topicB", 0)) === -2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 8b5296e..b50688e 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.util.Random
 
 import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.TopicPartition
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql.execution.streaming._
@@ -52,7 +53,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
   protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
     // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure
     // its "getOffset" is called before pushing any data. Otherwise, because of the race contion,
-    // we don't know which data should be fetched when `startingOffset` is latest.
+    // we don't know which data should be fetched when `startingOffsets` is latest.
     q.processAllAvailable()
     true
   }
@@ -155,26 +156,52 @@ class KafkaSourceSuite extends KafkaSourceTest {
     )
   }
 
+  test("assign from latest offsets") {
+    val topic = newTopic()
+    testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4))
+  }
+
+  test("assign from earliest offsets") {
+    val topic = newTopic()
+    testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4))
+  }
+
+  test("assign from specific offsets") {
+    val topic = newTopic()
+    testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4))
+  }
+
   test("subscribing topic by name from latest offsets") {
     val topic = newTopic()
-    testFromLatestOffsets(topic, "subscribe" -> topic)
+    testFromLatestOffsets(topic, true, "subscribe" -> topic)
   }
 
   test("subscribing topic by name from earliest offsets") {
     val topic = newTopic()
-    testFromEarliestOffsets(topic, "subscribe" -> topic)
+    testFromEarliestOffsets(topic, true, "subscribe" -> topic)
+  }
+
+  test("subscribing topic by name from specific offsets") {
+    val topic = newTopic()
+    testFromSpecificOffsets(topic, "subscribe" -> topic)
   }
 
   test("subscribing topic by pattern from latest offsets") {
     val topicPrefix = newTopic()
     val topic = topicPrefix + "-suffix"
-    testFromLatestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
+    testFromLatestOffsets(topic, true, "subscribePattern" -> s"$topicPrefix-.*")
   }
 
   test("subscribing topic by pattern from earliest offsets") {
     val topicPrefix = newTopic()
     val topic = topicPrefix + "-suffix"
-    testFromEarliestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
+    testFromEarliestOffsets(topic, true, "subscribePattern" -> s"$topicPrefix-.*")
+  }
+
+  test("subscribing topic by pattern from specific offsets") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-suffix"
+    testFromSpecificOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
   }
 
   test("subscribing topic by pattern with topic deletions") {
@@ -233,6 +260,10 @@ class KafkaSourceSuite extends KafkaSourceTest {
     testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")(
       "only one", "options can be specified")
 
+    testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")(
+      "only one", "options can be specified")
+
+    testBadOptions("assign" -> "")("no topicpartitions to assign")
     testBadOptions("subscribe" -> "")("no topics to subscribe")
     testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
   }
@@ -293,7 +324,61 @@ class KafkaSourceSuite extends KafkaSourceTest {
 
   private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
 
-  private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = {
+  private def assignString(topic: String, partitions: Iterable[Int]): String = {
+    JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  private def testFromSpecificOffsets(topic: String, options: (String, String)*): Unit = {
+    val partitionOffsets = Map(
+      new TopicPartition(topic, 0) -> -2L,
+      new TopicPartition(topic, 1) -> -1L,
+      new TopicPartition(topic, 2) -> 0L,
+      new TopicPartition(topic, 3) -> 1L,
+      new TopicPartition(topic, 4) -> 2L
+    )
+    val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
+
+    testUtils.createTopic(topic, partitions = 5)
+    // part 0 starts at earliest, these should all be seen
+    testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0))
+    // part 1 starts at latest, these should all be skipped
+    testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1))
+    // part 2 starts at 0, these should all be seen
+    testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
+    // part 3 starts at 1, first should be skipped
+    testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
+    // part 4 starts at 2, first and second should be skipped
+    testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("startingOffsets", startingOffsets)
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+    options.foreach { case (k, v) => reader.option(k, v) }
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
+      StopStream,
+      StartStream(),
+      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery
+      AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition = true),
+      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34),
+      StopStream
+    )
+  }
+
+  private def testFromLatestOffsets(
+      topic: String,
+      addPartitions: Boolean,
+      options: (String, String)*): Unit = {
     testUtils.createTopic(topic, partitions = 5)
     testUtils.sendMessages(topic, Array("-1"))
     require(testUtils.getLatestOffsets(Set(topic)).size === 5)
@@ -301,7 +386,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
     val reader = spark
       .readStream
       .format("kafka")
-      .option("startingOffset", s"latest")
+      .option("startingOffsets", s"latest")
       .option("kafka.bootstrap.servers", testUtils.brokerAddress)
       .option("kafka.metadata.max.age.ms", "1")
     options.foreach { case (k, v) => reader.option(k, v) }
@@ -324,7 +409,9 @@ class KafkaSourceSuite extends KafkaSourceTest {
       AddKafkaData(Set(topic), 7, 8),
       CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
       AssertOnQuery("Add partitions") { query: StreamExecution =>
-        testUtils.addPartitions(topic, 10)
+        if (addPartitions) {
+          testUtils.addPartitions(topic, 10)
+        }
         true
       },
       AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
@@ -332,7 +419,10 @@ class KafkaSourceSuite extends KafkaSourceTest {
     )
   }
 
-  private def testFromEarliestOffsets(topic: String, options: (String, String)*): Unit = {
+  private def testFromEarliestOffsets(
+      topic: String,
+      addPartitions: Boolean,
+      options: (String, String)*): Unit = {
     testUtils.createTopic(topic, partitions = 5)
     testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray)
     require(testUtils.getLatestOffsets(Set(topic)).size === 5)
@@ -340,7 +430,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
     val reader = spark.readStream
     reader
       .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
-      .option("startingOffset", s"earliest")
+      .option("startingOffsets", s"earliest")
       .option("kafka.bootstrap.servers", testUtils.brokerAddress)
       .option("kafka.metadata.max.age.ms", "1")
     options.foreach { case (k, v) => reader.option(k, v) }
@@ -360,7 +450,9 @@ class KafkaSourceSuite extends KafkaSourceTest {
       StartStream(),
       CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
       AssertOnQuery("Add partitions") { query: StreamExecution =>
-        testUtils.addPartitions(topic, 10)
+        if (addPartitions) {
+          testUtils.addPartitions(topic, 10)
+        }
         true
       },
       AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),

http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 3eb8a73..9b24ccd 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -201,11 +201,23 @@ class KafkaTestUtils extends Logging {
 
   /** Send the array of messages to the Kafka broker */
   def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = {
+    sendMessages(topic, messages, None)
+  }
+
+  /** Send the array of messages to the Kafka broker using specified partition */
+  def sendMessages(
+      topic: String,
+      messages: Array[String],
+      partition: Option[Int]): Seq[(String, RecordMetadata)] = {
     producer = new KafkaProducer[String, String](producerConfiguration)
     val offsets = try {
       messages.map { m =>
+        val record = partition match {
+          case Some(p) => new ProducerRecord[String, String](topic, p, null, m)
+          case None => new ProducerRecord[String, String](topic, m)
+        }
         val metadata =
-          producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS)
+          producer.send(record).get(10, TimeUnit.SECONDS)
           logInfo(s"\tSent $m to partition ${metadata.partition}, offset ${metadata.offset}")
         (m, metadata)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org