You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/05 10:07:35 UTC

[GitHub] [spark] gaborgsomogyi commented on a change in pull request #24967: [SPARK-28163][SS] Use CaseInsensitiveMap for KafkaOffsetReader

gaborgsomogyi commented on a change in pull request #24967: [SPARK-28163][SS] Use CaseInsensitiveMap for KafkaOffsetReader
URL: https://github.com/apache/spark/pull/24967#discussion_r300620741
 
 

 ##########
 File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala
 ##########
 @@ -30,52 +30,100 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester {
 
+  private val expected = "1111"
   private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs)
   private val maxOffsetsPerTriggerMethod = PrivateMethod[Option[Long]]('maxOffsetsPerTrigger)
+  private val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('offsetReader)
+  private val fetchOffsetNumRetriesMethod = PrivateMethod[Int]('fetchOffsetNumRetries)
+  private val fetchOffsetRetryIntervalMsMethod = PrivateMethod[Long]('fetchOffsetRetryIntervalMs)
 
   override protected def afterEach(): Unit = {
     SparkEnv.set(null)
     super.afterEach()
   }
 
+  test("batch mode - options should be handled as case-insensitive") {
+    verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, batch => {
+      assert(expected.toLong === getField(batch, pollTimeoutMsMethod))
+    })
+  }
+
   test("micro-batch mode - options should be handled as case-insensitive") {
-    def verifyFieldsInMicroBatchStream(
-        options: CaseInsensitiveStringMap,
-        expectedPollTimeoutMs: Long,
-        expectedMaxOffsetsPerTrigger: Option[Long]): Unit = {
-      // KafkaMicroBatchStream reads Spark conf from SparkEnv for default value
-      // hence we set mock SparkEnv here before creating KafkaMicroBatchStream
-      val sparkEnv = mock(classOf[SparkEnv])
-      when(sparkEnv.conf).thenReturn(new SparkConf())
-      SparkEnv.set(sparkEnv)
+    verifyFieldsInMicroBatchStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => {
+      assert(expected.toLong === getField(stream, pollTimeoutMsMethod))
+    })
+    verifyFieldsInMicroBatchStream(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER, expected, stream => {
+      assert(Some(expected.toLong) === getField(stream, maxOffsetsPerTriggerMethod))
+    })
+  }
 
-      val scan = getKafkaDataSourceScan(options)
-      val stream = scan.toMicroBatchStream("dummy").asInstanceOf[KafkaMicroBatchStream]
+  test("SPARK-28163 - micro-batch mode - options should be handled as case-insensitive") {
 
 Review comment:
   I've received many comments previously saying we have to keep jira in the test when it was a bugfix. In this situation I had the same feeling like you and I would personally like to have 3 tests in this case `batch`, `micro-batch` and `continuous` and forgetting about jira number. So all in all it will depend on the committer how it's organized.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org