You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gabor Somogyi (JIRA)" <ji...@apache.org> on 2018/12/12 08:47:00 UTC
[jira] [Resolved] (SPARK-23663) Spark Streaming Kafka 010 , fails
with "java.util.ConcurrentModificationException: KafkaConsumer is not safe
for multi-threaded access"
[ https://issues.apache.org/jira/browse/SPARK-23663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gabor Somogyi resolved SPARK-23663.
-----------------------------------
Resolution: Duplicate
Fix Version/s: 2.4.0
This should be fixed in SPARK-19185.
> Spark Streaming Kafka 010 , fails with "java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-23663
> URL: https://issues.apache.org/jira/browse/SPARK-23663
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.2.0
> Environment: Spark 2.2.0
> Spark streaming kafka 010
>
> Reporter: kaushik srinivas
> Priority: Major
> Fix For: 2.4.0
>
>
> test being tried:
> 10 kafka topics created. Streamed with avro data from kafka producers.
> org.apache.spark.streaming.kafka010 used for creating directStream to kafka.
> A single direct stream is created for all the ten topics.
> And on each RDD(batch of 50 seconds), key of kafka consumer record is checked and seperate RDDs are created for seperate topics.
> Each topic has records with key as topic name and value of avro messages.
> Finally ten RDDs are converted to data frames and registered as separate temp tables.
> Once all the temp tables are registered, few sql queries are run on these temp tables.
>
> Exception seen:
> 18/03/12 11:58:34 WARN TaskSetManager: Lost task 23.0 in stage 4.0 (TID 269, 192.168.24.145, executor 7): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:80)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.1 in stage 4.0 (TID 828, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
> 18/03/12 11:58:34 INFO TaskSetManager: Lost task 23.1 in stage 4.0 (TID 828) on 192.168.24.145, executor 7: java.util.ConcurrentModificationException (KafkaConsumer is not safe for multi-threaded access) [duplicate 1]
> 18/03/12 11:58:34 INFO TaskSetManager: Starting task 23.2 in stage 4.0 (TID 829, 192.168.24.145, executor 7, partition 23, PROCESS_LOCAL, 4758 bytes)
> 18/03/12 11:58:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 30, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed.
> at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 18/03/12 11:58:40 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 830, 192.168.24.147, executor 6, partition 0, PROCESS_LOCAL, 4758 bytes)
> 18/03/12 11:58:45 WARN TaskSetManager: Lost task 0.1 in stage 4.0 (TID 296, 192.168.24.147, executor 6): java.lang.IllegalStateException: This consumer has already been closed.
> at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Code snippet:
> val stream = KafkaUtils.createDirectStream[Object, Object](ssc,
> PreferConsistent,
> Subscribe[Object, Object](topicsArr, kafkaParams)
> )
> val tbl = topicsArr(0).toString
> stream.foreachRDD(rdd => {
> var ca = new Array[String](0)
> var ss = new Array[String](0)
> if (!rdd.isEmpty())
> {
> val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
> import sqlContext.implicits._
> rdd.foreach(record =>
> {
> record.key() match {
> case "customer_address" => ca=Array.concat(ca,Array(record.value().toString))
> case "store_sales" => ss=Array.concat(ss,Array(record.value().toString))
> case _ => println("Invalid Key")
> };
> })
> //val topicValueStrings = rdd.map(record => (record.value()).toString)
> val df_ca = sqlContext.read.json(spark.sparkContext.parallelize(ca))
> val df_ss = sqlContext.read.json(spark.sparkContext.parallelize(ss))
> try{
> df_ca.registerTempTable("customer_address")
> df_ss.registerTempTable("store_sales")
> }
> catch{
> case e : Throwable => {
> println(e.getStackTrace())
> }
> }
> try{
> //spark.sql("show tables")
> println ("======New Batch=======")
> spark.sql(s"select count(1) as cnt,'customer_address' as tableName from customer_address").show()
> spark.sql(s"select count(1) as cnt,'store_sales' as tableName from store_sales").show()
> }
> catch{
> case e : Throwable => {
> println(e.getStackTrace())}
> }
>
> Spark session is created with below confs:
> val spark = SparkSession.builder()
> .appName(appname)
> .config("hive.metastore.uris", hivemeta)
> .enableHiveSupport()
> .config("hive.exec.dynamic.partition", "true")
> .config("hive.exec.dynamic.partition.mode", "nonstrict")
> .config("spark.driver.allowMultipleContexts", "true")
> .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .config("spark.kryoserializer.buffer.mb", "64")
> .config("spark.sql.tungsten.enabled", "true")
> .config("spark.app.id", appname)
> .config("spark.speculation","false")
> .config("spark.sql.parquet.mergeSchema", "false")
> .getOrCreate()
> Note: spark.streaming.kafka.consumer.cache.enabled is not made false.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org