You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/11 08:44:15 UTC

[GitHub] [spark] ScrapCodes commented on a change in pull request #19096: [SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it - adds inuse tracking.

ScrapCodes commented on a change in pull request #19096: [SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it - adds inuse tracking.
URL: https://github.com/apache/spark/pull/19096#discussion_r302428339
 
 

 ##########
 File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
 ##########
 @@ -18,60 +18,207 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.{Executors, TimeUnit}
 
-import org.apache.kafka.clients.producer.KafkaProducer
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.scalatest.PrivateMethodTester
 
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
 
-class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester with KafkaTest {
 
-  type KP = KafkaProducer[Array[Byte], Array[Byte]]
+class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest {
 
   protected override def beforeEach(): Unit = {
     super.beforeEach()
     CachedKafkaProducer.clear()
   }
 
-  test("Should return the cached instance on calling getOrCreate with same params.") {
-    val kafkaParams = new ju.HashMap[String, Object]()
-    kafkaParams.put("acks", "0")
-    // Here only host should be resolvable, it does not need a running instance of kafka server.
-    kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
-    kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
-    kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
-    val producer = CachedKafkaProducer.getOrCreate(kafkaParams)
-    val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams)
-    assert(producer == producer2)
-
-    val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap)
-    val map = CachedKafkaProducer.invokePrivate(cacheMap())
+  test("Should return the cached instance on calling acquire with same params.") {
+    val kafkaParams = generateKafkaParams
+    val producer = CachedKafkaProducer.acquire(kafkaParams)
+    val producer2 = CachedKafkaProducer.acquire(kafkaParams)
+    assert(producer.kafkaProducer == producer2.kafkaProducer)
+    assert(producer.getInUseCount == 2)
+    val map = CachedKafkaProducer.getAsMap
     assert(map.size == 1)
   }
 
-  test("Should close the correct kafka producer for the given kafkaPrams.") {
+  test("Should return the new instance on calling acquire with different params.") {
+    val kafkaParams = generateKafkaParams
+    val producer = CachedKafkaProducer.acquire(kafkaParams)
+    kafkaParams.remove("acks") // mutate the kafka params.
+    val producer2 = CachedKafkaProducer.acquire(kafkaParams)
+    assert(producer.kafkaProducer != producer2.kafkaProducer)
+    assert(producer.getInUseCount == 1)
+    assert(producer2.getInUseCount == 1)
+    val map = CachedKafkaProducer.getAsMap
+    assert(map.size == 2)
+  }
+
+  test("Automatically remove a failing kafka producer from cache.") {
+    import testImplicits._
+    val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
+    val ex = intercept[SparkException] {
+      // This will fail because the service is not reachable.
+      df.write
+        .format("kafka")
+        .option("topic", "topic")
+        .option("kafka.retries", "1")
+        .option("kafka.max.block.ms", "2")
+        .option("kafka.bootstrap.servers", "12.0.0.1:39022")
+        .save()
+    }
+    assert(ex.getMessage.contains("TimeoutException"),
 
 Review comment:
   hm.., exception does get logged in the logs as ERROR, but does not get thrown.
   ```
   19/07/11 12:07:26.016 Executor task launch worker for task 0 ERROR Utils: Aborting task
   org.apache.kafka.common.errors.TimeoutException: Topic topic not present in metadata after 2 ms.
   19/07/11 12:07:26.019 Executor task launch worker for task 0 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 0, attempt 0stage 0.0)
   
   ```
   
   Do you know what is going on, here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org