You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gabor Somogyi (JIRA)" <ji...@apache.org> on 2018/12/12 08:47:00 UTC

[jira] [Closed] (SPARK-23663) Spark Streaming Kafka 010 , fails with "java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"

     [ https://issues.apache.org/jira/browse/SPARK-23663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gabor Somogyi closed SPARK-23663.
---------------------------------

> Spark Streaming Kafka 010 , fails with "java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23663
>                 URL: https://issues.apache.org/jira/browse/SPARK-23663
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.2.0
>         Environment: Spark 2.2.0 
> Spark streaming kafka 010
>  
>            Reporter: kaushik srinivas
>            Priority: Major
>             Fix For: 2.4.0
>
>
> test being tried:
> 10 kafka topics created. Streamed with avro data from kafka producers.
> org.apache.spark.streaming.kafka010 used for creating directStream to kafka.
> A single direct stream is created for all the ten topics.
> And on each RDD(batch of 50 seconds), key of kafka consumer record is checked and seperate RDDs are created for seperate topics.
> Each topic has records with key as topic name and value of avro messages.
> Finally ten RDDs are converted to data frames and registered as separate temp tables.
> Once all the temp tables are registered, few sql queries are run on these temp tables.
>  
> Exception seen:
> 18/03/12 11:58:34 WARN TaskSetManager: Lost task 23.0 in stage 4.0 (TID 269, 192.168.24.145, executor 7): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
>  at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>  at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>  at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:80)
>  at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>  at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
>  at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
>  at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
>  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:108)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> 18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.1 in stage 4.0 (TID 828, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
> 18/03/12 11:58:34 INFO TaskSetManager: Lost task 23.1 in stage 4.0 (TID 828) on 192.168.24.145, executor 7: java.util.ConcurrentModificationException (KafkaConsumer is not safe for multi-threaded access) [duplicate 1]
> 18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.2 in stage 4.0 (TID 829, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
> 18/03/12 11:58:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 30, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed.
>  at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
>  at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
>  at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
>  at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>  at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
>  at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
>  at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
>  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:108)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> 18/03/12 11:58:40 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 830, 192.168.24.147, executor 6, partition 0, PROCESS_LOCAL, 4758 bytes)
> 18/03/12 11:58:45 WARN TaskSetManager: Lost task 0.1 in stage 4.0 (TID 296, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed.
>  at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
>  at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
>  at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
>  at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>  at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
>  at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
>  at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
>  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:108)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  
> Code snippet:
> val stream = KafkaUtils.createDirectStream[Object, Object](ssc,
>  PreferConsistent,
>  Subscribe[Object, Object](topicsArr, kafkaParams)
>  )
>  val tbl = topicsArr(0).toString
>  stream.foreachRDD(rdd => {
>  var ca = new Array[String](0)
>  var ss = new Array[String](0)
>  if (!rdd.isEmpty())
>  {
>  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
>  import sqlContext.implicits._
>  rdd.foreach(record =>
>  {
>  record.key() match {
>  case "customer_address" => ca=Array.concat(ca,Array(record.value().toString))
>  case "store_sales" => ss=Array.concat(ss,Array(record.value().toString))
>  case _ => println("Invalid Key")
>  };
>  })
>  //val topicValueStrings = rdd.map(record => (record.value()).toString)
>  val df_ca = sqlContext.read.json(spark.sparkContext.parallelize(ca))
>  val df_ss = sqlContext.read.json(spark.sparkContext.parallelize(ss))
>  try{
>  df_ca.registerTempTable("customer_address")
>  df_ss.registerTempTable("store_sales")
>  }
>  catch{
>  case e : Throwable => {
>  println(e.getStackTrace())
>  }
>  }
>  try{
>  //spark.sql("show tables")
>  println ("======New Batch=======")
>  spark.sql(s"select count(1) as cnt,'customer_address' as tableName from customer_address").show()
>  spark.sql(s"select count(1) as cnt,'store_sales' as tableName from store_sales").show()
>  }
>  catch{
>  case e : Throwable => {
>  println(e.getStackTrace())}
>  }
>  
> Spark session is created with below confs:
> val spark = SparkSession.builder()
>  .appName(appname)
>  .config("hive.metastore.uris", hivemeta)
>  .enableHiveSupport()
>  .config("hive.exec.dynamic.partition", "true")
>  .config("hive.exec.dynamic.partition.mode", "nonstrict")
>  .config("spark.driver.allowMultipleContexts", "true")
>  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>  .config("spark.kryoserializer.buffer.mb", "64")
>  .config("spark.sql.tungsten.enabled", "true")
>  .config("spark.app.id", appname)
>  .config("spark.speculation","false")
>  .config("spark.sql.parquet.mergeSchema", "false")
>  .getOrCreate()
> Note: spark.streaming.kafka.consumer.cache.enabled is not made false.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org