You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/07/04 04:59:10 UTC
[spark] branch master updated: [SPARK-28142][SS][TEST][FOLLOWUP]
Add configuration check test on Kafka continuous stream
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4212a30 [SPARK-28142][SS][TEST][FOLLOWUP] Add configuration check test on Kafka continuous stream
4212a30 is described below
commit 4212a308839cfdf066d63c7f0589511155f56a2e
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Wed Jul 3 21:58:35 2019 -0700
[SPARK-28142][SS][TEST][FOLLOWUP] Add configuration check test on Kafka continuous stream
## What changes were proposed in this pull request?
This patch adds missing UT which tests the changed behavior of original patch #24942.
## How was this patch tested?
Newly added UT.
Closes #24999 from HeartSaVioR/SPARK-28142-FOLLOWUP.
Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../sql/kafka010/KafkaSourceProviderSuite.scala | 102 +++++++++++++++++++++
1 file changed, 102 insertions(+)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala
new file mode 100644
index 0000000..2fcf37a
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.sql.sources.v2.reader.Scan
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester {
+
+ private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs)
+ private val maxOffsetsPerTriggerMethod = PrivateMethod[Option[Long]]('maxOffsetsPerTrigger)
+
+ override protected def afterEach(): Unit = {
+ SparkEnv.set(null)
+ super.afterEach()
+ }
+
+ test("micro-batch mode - options should be handled as case-insensitive") {
+ def verifyFieldsInMicroBatchStream(
+ options: CaseInsensitiveStringMap,
+ expectedPollTimeoutMs: Long,
+ expectedMaxOffsetsPerTrigger: Option[Long]): Unit = {
+ // KafkaMicroBatchStream reads Spark conf from SparkEnv for default value
+ // hence we set mock SparkEnv here before creating KafkaMicroBatchStream
+ val sparkEnv = mock(classOf[SparkEnv])
+ when(sparkEnv.conf).thenReturn(new SparkConf())
+ SparkEnv.set(sparkEnv)
+
+ val scan = getKafkaDataSourceScan(options)
+ val stream = scan.toMicroBatchStream("dummy").asInstanceOf[KafkaMicroBatchStream]
+
+ assert(expectedPollTimeoutMs === getField(stream, pollTimeoutMsMethod))
+ assert(expectedMaxOffsetsPerTrigger === getField(stream, maxOffsetsPerTriggerMethod))
+ }
+
+ val expectedValue = 1000L
+ buildCaseInsensitiveStringMapForUpperAndLowerKey(
+ KafkaSourceProvider.CONSUMER_POLL_TIMEOUT -> expectedValue.toString,
+ KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER -> expectedValue.toString)
+ .foreach(verifyFieldsInMicroBatchStream(_, expectedValue, Some(expectedValue)))
+ }
+
+ test("SPARK-28142 - continuous mode - options should be handled as case-insensitive") {
+ def verifyFieldsInContinuousStream(
+ options: CaseInsensitiveStringMap,
+ expectedPollTimeoutMs: Long): Unit = {
+ val scan = getKafkaDataSourceScan(options)
+ val stream = scan.toContinuousStream("dummy").asInstanceOf[KafkaContinuousStream]
+ assert(expectedPollTimeoutMs === getField(stream, pollTimeoutMsMethod))
+ }
+
+ val expectedValue = 1000
+ buildCaseInsensitiveStringMapForUpperAndLowerKey(
+ KafkaSourceProvider.CONSUMER_POLL_TIMEOUT -> expectedValue.toString)
+ .foreach(verifyFieldsInContinuousStream(_, expectedValue))
+ }
+
+ private def buildCaseInsensitiveStringMapForUpperAndLowerKey(
+ options: (String, String)*): Seq[CaseInsensitiveStringMap] = {
+ Seq(options.map(entry => (entry._1.toUpperCase(Locale.ROOT), entry._2)),
+ options.map(entry => (entry._1.toLowerCase(Locale.ROOT), entry._2)))
+ .map(buildKafkaSourceCaseInsensitiveStringMap)
+ }
+
+ private def buildKafkaSourceCaseInsensitiveStringMap(
+ options: (String, String)*): CaseInsensitiveStringMap = {
+ val requiredOptions = Map("kafka.bootstrap.servers" -> "dummy", "subscribe" -> "dummy")
+ new CaseInsensitiveStringMap((options.toMap ++ requiredOptions).asJava)
+ }
+
+ private def getKafkaDataSourceScan(options: CaseInsensitiveStringMap): Scan = {
+ val provider = new KafkaSourceProvider()
+ provider.getTable(options).newScanBuilder(options).build()
+ }
+
+ private def getField[T](obj: AnyRef, method: PrivateMethod[T]): T = {
+ obj.invokePrivate(method())
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org