You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/04/04 10:39:53 UTC

[GitHub] [spark] gaborgsomogyi commented on a change in pull request #19096: [SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it - adds inuse tracking.

gaborgsomogyi commented on a change in pull request #19096: [SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it - adds inuse tracking.
URL: https://github.com/apache/spark/pull/19096#discussion_r272119893
 
 

 ##########
 File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
 ##########
 @@ -18,60 +18,206 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.{ConcurrentLinkedQueue, Executors, TimeUnit}
+
+import scala.collection.mutable
+import scala.util.Random
 
-import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.scalatest.PrivateMethodTester
 
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
 
-class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester with KafkaTest {
 
-  type KP = KafkaProducer[Array[Byte], Array[Byte]]
+class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest {
 
   protected override def beforeEach(): Unit = {
     super.beforeEach()
     CachedKafkaProducer.clear()
   }
 
-  test("Should return the cached instance on calling getOrCreate with same params.") {
-    val kafkaParams = new ju.HashMap[String, Object]()
-    kafkaParams.put("acks", "0")
-    // Here only host should be resolvable, it does not need a running instance of kafka server.
-    kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
-    kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
-    kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
-    val producer = CachedKafkaProducer.getOrCreate(kafkaParams)
-    val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams)
-    assert(producer == producer2)
-
-    val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap)
-    val map = CachedKafkaProducer.invokePrivate(cacheMap())
+  test("Should return the cached instance on calling acquire with same params.") {
+    val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams
+    val producer = CachedKafkaProducer.acquire(kafkaParams)
+    val producer2 = CachedKafkaProducer.acquire(kafkaParams)
+    assert(producer.kafkaProducer == producer2.kafkaProducer)
+    assert(producer.getInUseCount == 2)
+    val map = CachedKafkaProducer.getAsMap
     assert(map.size == 1)
   }
 
-  test("Should close the correct kafka producer for the given kafkaPrams.") {
+  test("Should return the new instance on calling acquire with different params.") {
+    val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams
+    val producer = CachedKafkaProducer.acquire(kafkaParams)
+    kafkaParams.remove("ack") // mutate the kafka params.
+    val producer2 = CachedKafkaProducer.acquire(kafkaParams)
+    assert(producer.kafkaProducer != producer2.kafkaProducer)
+    assert(producer.getInUseCount == 1)
+    assert(producer2.getInUseCount == 1)
+    val map = CachedKafkaProducer.getAsMap
+    assert(map.size == 2)
+  }
+
+  test("Automatically remove a failing kafka producer from cache.") {
+    import testImplicits._
+    val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
+    val ex = intercept[SparkException] {
+      // This will fail because the service is not reachable.
+      df.write
+        .format("kafka")
+        .option("topic", "topic")
+        .option("kafka.retries", "1")
+        .option("kafka.max.block.ms", "2")
+        .option("kafka.bootstrap.servers", "12.0.0.1:39022")
+        .save()
+    }
+    assert(ex.getMessage.contains("org.apache.kafka.common.errors.TimeoutException"),
+      "Spark command should fail due to service not reachable.")
+    // Since failing kafka producer is released on error and also invalidated, it should not be in
+    // cache.
+    val map = CachedKafkaProducer.getAsMap
+    assert(map.size == 0)
+  }
+
+  test("Should not close a producer in-use.") {
+    val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams
+    val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(kafkaParams)
+    producer.kafkaProducer // initializing the producer.
+    assert(producer.getInUseCount == 1)
+    // Explicitly cause the producer from guava cache to be evicted.
+    CachedKafkaProducer.evict(producer.getKafkaParams)
+    assert(producer.getInUseCount == 1)
+    assert(!producer.isClosed, "An in-use producer should not be closed.")
+  }
+
+  private def generateKafkaParams: ju.HashMap[String, Object] = {
     val kafkaParams = new ju.HashMap[String, Object]()
-    kafkaParams.put("acks", "0")
+    kafkaParams.put("ack", "0")
     kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
     kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
     kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
-    val producer: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
-    kafkaParams.put("acks", "1")
-    val producer2: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
-    // With updated conf, a new producer instance should be created.
-    assert(producer != producer2)
-
-    val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap)
-    val map = CachedKafkaProducer.invokePrivate(cacheMap())
-    assert(map.size == 2)
+    kafkaParams
+  }
+}
+
+class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest {
 
 Review comment:
   Re-tested `test("concurrent use of CachedKafkaProducer")` and after ~10239 open file descriptors the same exception came. In `test("SPARK-23623: concurrent use of KafkaDataConsumer")` this number is fluctuating around 500. You can check it by adding the following code after `aquire`:
   ```
   import java.lang.management.ManagementFactory
   import com.sun.management.UnixOperatingSystemMXBean
   System.out.println("Number of open fd: " + ManagementFactory.getOperatingSystemMXBean.asInstanceOf[UnixOperatingSystemMXBean].getOpenFileDescriptorCount)
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org