You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WweiL (via GitHub)" <gi...@apache.org> on 2023/07/11 16:55:55 UTC

[GitHub] [spark] WweiL commented on a diff in pull request #41791: [SPARK-44285] MSK IAM Support

WweiL commented on code in PR #41791:
URL: https://github.com/apache/spark/pull/41791#discussion_r1260010746


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -66,77 +66,78 @@ class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
   }
 
   /*
-    the goal of this test is to verify the functionality of the aws msk IAM auth
+    the goal of these test is to verify the functionality of the aws msk IAM auth
     how this test works:
-    if testType contains source/sink, kafka is used as a source/sink option respectively
-    if testType contains stream/batch, it is used either in readStream/read or writeStream/write
-
     In each case, we test that the library paths are discoverable since
     if the library was not to be found, another error message would be thrown.
-    Although this broker exists, it does not have IAM capabilities and thus
-    it is expected that a timeout error will be thrown.
+    The kafka client keeps calling the 'describeTopics' endpoint on the broker
+    while using IAM authentication which times out since it doesn't have IAM auth enabled.
+    Thus, it is expected that a timeout error will be thrown.
   */
-  Seq("source and stream", "sink and stream",
-    "source and batch", "sink and batch").foreach { testType =>
-    test(s"test MSK IAM auth on kafka '$testType' side") {
-      val options: Map[String, String] = Map(
-        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
-        "subscribe" -> "msk-123",
-        "startingOffsets" -> "earliest",
-        "kafka.sasl.mechanism" -> "AWS_MSK_IAM",
-        "kafka.sasl.jaas.config" ->
-          "software.amazon.msk.auth.iam.IAMLoginModule required;",
-        "kafka.security.protocol" -> "SASL_SSL",
-        "kafka.sasl.client.callback.handler.class" ->
-          "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
-        "retries" -> "0",
-        "kafka.request.timeout.ms" -> "3000",
-        "kafka.default.api.timeout.ms" -> "3000",
-        "kafka.max.block.ms" -> "3000"
-      )
-
-      testUtils.createTopic(options("subscribe"))
-
-      var e: Throwable = null
-      if (testType.contains("stream")) {
-        if (testType.contains("source")) {
-          e = intercept[StreamingQueryException] {
-            spark.readStream.format("kafka").options(options).load()
-              .writeStream.format("console").start().processAllAvailable()
-          }
-          TestUtils.assertExceptionMsg(e, "Timed out waiting for a node assignment")
-        } else {
-          e = intercept[StreamingQueryException] {
-            spark.readStream.format("rate").option("rowsPerSecond", 10).load()
-              .withColumn("value", col("value").cast(StringType)).writeStream
-              .format("kafka").options(options).option("checkpointLocation", "temp/testing")
-              .option("topic", options("subscribe")).start().processAllAvailable()
-          }
-          TestUtils.assertExceptionMsg(e, s"TimeoutException: Topic ${options("subscribe")} " +
-            s"not present in metadata")
-        }
-      } else {
-        if (testType.contains("source")) {
-          e = intercept[ExecutionException] {
-            spark.read.format("kafka").options(options).load()
-              .write.format("console").save()
-          }
-          TestUtils.assertExceptionMsg(e, "Timed out waiting for a node assignment")
-        } else {
-          val schema = new StructType().add("value", "string")
-          e = intercept[SparkException] {
-            spark.createDataFrame(Seq(Row("test"), Row("test2")).asJava, schema)
-              .write.mode("append").format("kafka")
-              .options(options).option("checkpointLocation", "temp/testing/1")
-              .option("topic", options("subscribe")).save()
-          }
-          TestUtils.assertExceptionMsg(e, s"TimeoutException: Topic ${options("subscribe")} " +
-            s"not present in metadata")
-        }
-      }
+  private val mskIAMTestKafkaOptions: Map[String, String] = Map(
+    "subscribe" -> "msk-123",
+    "startingOffsets" -> "earliest",
+    "kafka.sasl.mechanism" -> "AWS_MSK_IAM",
+    "kafka.sasl.jaas.config" ->
+      "software.amazon.msk.auth.iam.IAMLoginModule required;",
+    "kafka.security.protocol" -> "SASL_SSL",
+    "kafka.sasl.client.callback.handler.class" ->
+      "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
+    "retries" -> "0",
+    "kafka.request.timeout.ms" -> "3000",
+    "kafka.default.api.timeout.ms" -> "3000",
+    "kafka.max.block.ms" -> "3000"
+  )
+
+  test("test MSK IAM auth with streaming API and with kafka source") {
+    testUtils.createTopic(mskIAMTestKafkaOptions("subscribe"))
+    val e = intercept[StreamingQueryException] {
+      spark.readStream.format("kafka").options(mskIAMTestKafkaOptions)
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress).load()
+        .writeStream.format("console").start().processAllAvailable()
+    }
+    TestUtils.assertExceptionMsg(e, "Timed out waiting for a node assignment")
+    testUtils.deleteTopic(mskIAMTestKafkaOptions("subscribe"))
+  }
+
+  test("test MSK IAM auth with streaming API and with kafka sink") {
+    testUtils.createTopic(mskIAMTestKafkaOptions("subscribe"))
+    val e = intercept[StreamingQueryException] {
+      spark.readStream.format("rate").option("rowsPerSecond", 10).load()
+        .withColumn("value", col("value").cast(StringType)).writeStream
+        .format("kafka").options(mskIAMTestKafkaOptions).option("kafka.bootstrap.servers",
+        testUtils.brokerAddress).option("checkpointLocation", "temp/testing")

Review Comment:
   better to use a `withTempDir` method even if this location won't be created



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org