You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Kalvin Chau <ka...@gmail.com> on 2017/01/11 22:38:43 UTC

[Streaming] ConcurrentModificationExceptions when Windowing

Hi,

We've been running into ConcurrentModificationExcpetions "KafkaConsumer is
not safe for multi-threaded access" with the CachedKafkaConsumer. I've been
working through debugging this issue and after looking through some of the
spark source code I think this is a bug.

Our set up is:
Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
Spark-Streaming-Kafka-010
spark.executor.cores 1
spark.mesos.extra.cores 1

Batch interval: 10s, window interval: 180s, and slide interval: 30s

We would see the exception when in one executor there are two task worker
threads assigned the same Topic+Partition, but a different set of offsets.

They would both get the same CachedKafkaConsumer, and whichever task thread
went first would seek and poll for all the records, and at the same time
the second thread would try to seek to its offset but fail because it is
unable to acquire the lock.

Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z

Time1 E0 Task0 - Seeks and starts to poll
Time1 E0 Task1 - Attempts to seek, but fails

Here are some relevant logs:
17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
topic test-topic, partition 2 offsets 4394204414 -> 4394238058
17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
topic test-topic, partition 2 offsets 4394238058 -> 4394257712
17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
4394204414
17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
4394238058
17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
Initial fetch for spark-executor-consumer test-topic 2 4394238058
17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
Seeking to test-topic-2 4394238058
17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
block rdd_199_2 failed due to an exception
17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
rdd_199_2 could not be removed as it was not found on disk or in memory
17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
in task 49.0 in stage 45.0 (TID 3201)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
Polled [test-topic-2]  8237
17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested
4394204415
17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested
4394204416
...

It looks like when WindowedDStream does the getOrCompute call its computing
all the sets of of offsets it needs and tries to farm out the work in
parallel. So each available worker task gets each set of offsets that need
to be read.

After realizing what was going on I tested four states:

   - spark.executor.cores 1 and spark.mesos.extra.cores 0
      - No Exceptions
   - spark.executor.cores 1 and spark.mesos.extra.cores 1
      - ConcurrentModificationException
   - spark.executor.cores 2 and spark.mesos.extra.cores 0
      - ConcurrentModificationException
   - spark.executor.cores 2 and spark.mesos.extra.cores 1
      - ConcurrentModificationException


I'm not sure what the best solution to this is if we want to be able to
have N tasks threads read from the same TopicPartition to increase
parallelization. You could possibly allow N CachedKafkaConsumers for the
same TopicPartition.

Any thoughts on this?

Thanks,
Kalvin

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by Kalvin Chau <ka...@gmail.com>.
I've filed an issue here https://issues.apache.org/jira/browse/SPARK-19185,
let me know if I missed anything!

--Kalvin

On Wed, Jan 11, 2017 at 5:43 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> Thanks for reporting this. Finally I understood the root cause. Could you
> file a JIRA on https://issues.apache.org/jira/browse/SPARK please?
>
> On Wed, Jan 11, 2017 at 5:20 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> Here is the minimal code example where I was able to replicate:
> Batch interval is set to 2 to get the exceptions to happen more often.
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[KafkaAvroDeserializer],
>   "value.deserializer" -> classOf[KafkaAvroDeserializer],
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "group.id" -> groupId,
>   "schema.registry.url" -> schemaRegistryUrl,
>   "auto.offset.reset" -> offset
> )
>
> val inputStream = KafkaUtils.createDirectStream[Object, Object](
>   ssc,
>   PreferConsistent,
>   Subscribe[Object, Object]
>     (kafkaTopic, kafkaParams)
> )
>
> val windowStream = inputStream.map(_.toString).window(Seconds(180), Seconds(30))
>
> windowStream.foreachRDD{
>   rdd => {
>     val filtered = rdd.filter(_.contains("idb"))
>
>     filtered.foreach(
>       message => {
>         var i = 0
>         if (i == 0) {
>           logger.info(message)
>           i = i + 1
>         }
>       }
>     )
>   }
> }
>
>
> On Wed, Jan 11, 2017 at 4:04 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Could you post your codes, please?
>
> On Wed, Jan 11, 2017 at 3:53 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> "spark.speculation" is not set, so it would be whatever the default is.
>
>
> On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Or do you enable "spark.speculation"? If not, Spark Streaming should not
> launch two tasks using the same TopicPartition.
>
> On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> I have not modified that configuration setting, and that doesn't seem to
> be documented anywhere.
>
> Does the Kafka 0.10 require the number of cores on an executor be set to
> 1? I didn't see that documented anywhere either.
>
> On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10
> connector requires it must be 1.
>
> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> I'm not re-using any InputDStreams actually, this is one InputDStream that
> has a window applied to it.
>  Then when Spark creates and assigns tasks to read from the Topic, one
> executor gets assigned two tasks to read from the same TopicPartition, and
> uses the same CachedKafkaConsumer to read from the TopicPartition causing
> the ConcurrentModificationException in one of the worker threads.
>
> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> I think you may reuse the kafka DStream (the DStream returned by
> createDirectStream). If you need to read from the same Kafka source, you
> need to create another DStream.
>
> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> Hi,
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been
> working through debugging this issue and after looking through some of the
> spark source code I think this is a bug.
>
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>
> We would see the exception when in one executor there are two task worker
> threads assigned the same Topic+Partition, but a different set of offsets.
>
> They would both get the same CachedKafkaConsumer, and whichever task
> thread went first would seek and poll for all the records, and at the same
> time the second thread would try to seek to its offset but fail because it
> is unable to acquire the lock.
>
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
>
> Here are some relevant logs:
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
> in task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
>
>

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Thanks for reporting this. Finally I understood the root cause. Could you
file a JIRA on https://issues.apache.org/jira/browse/SPARK please?

On Wed, Jan 11, 2017 at 5:20 PM, Kalvin Chau <ka...@gmail.com> wrote:

> Here is the minimal code example where I was able to replicate:
> Batch interval is set to 2 to get the exceptions to happen more often.
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[KafkaAvroDeserializer],
>   "value.deserializer" -> classOf[KafkaAvroDeserializer],
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "group.id" -> groupId,
>   "schema.registry.url" -> schemaRegistryUrl,
>   "auto.offset.reset" -> offset
> )
>
> val inputStream = KafkaUtils.createDirectStream[Object, Object](
>   ssc,
>   PreferConsistent,
>   Subscribe[Object, Object]
>     (kafkaTopic, kafkaParams)
> )
>
> val windowStream = inputStream.map(_.toString).window(Seconds(180), Seconds(30))
>
> windowStream.foreachRDD{
>   rdd => {
>     val filtered = rdd.filter(_.contains("idb"))
>
>     filtered.foreach(
>       message => {
>         var i = 0
>         if (i == 0) {
>           logger.info(message)
>           i = i + 1
>         }
>       }
>     )
>   }
> }
>
>
> On Wed, Jan 11, 2017 at 4:04 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Could you post your codes, please?
>>
>> On Wed, Jan 11, 2017 at 3:53 PM, Kalvin Chau <ka...@gmail.com>
>> wrote:
>>
>> "spark.speculation" is not set, so it would be whatever the default is.
>>
>>
>> On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Or do you enable "spark.speculation"? If not, Spark Streaming should not
>> launch two tasks using the same TopicPartition.
>>
>> On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <ka...@gmail.com>
>> wrote:
>>
>> I have not modified that configuration setting, and that doesn't seem to
>> be documented anywhere.
>>
>> Does the Kafka 0.10 require the number of cores on an executor be set to
>> 1? I didn't see that documented anywhere either.
>>
>> On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka
>> 0.10 connector requires it must be 1.
>>
>> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <ka...@gmail.com>
>> wrote:
>>
>> I'm not re-using any InputDStreams actually, this is one InputDStream
>> that has a window applied to it.
>>  Then when Spark creates and assigns tasks to read from the Topic, one
>> executor gets assigned two tasks to read from the same TopicPartition, and
>> uses the same CachedKafkaConsumer to read from the TopicPartition causing
>> the ConcurrentModificationException in one of the worker threads.
>>
>> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> I think you may reuse the kafka DStream (the DStream returned by
>> createDirectStream). If you need to read from the same Kafka source, you
>> need to create another DStream.
>>
>> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> We've been running into ConcurrentModificationExcpetions "KafkaConsumer
>> is not safe for multi-threaded access" with the CachedKafkaConsumer. I've
>> been working through debugging this issue and after looking through some of
>> the spark source code I think this is a bug.
>>
>> Our set up is:
>> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
>> Spark-Streaming-Kafka-010
>> spark.executor.cores 1
>> spark.mesos.extra.cores 1
>>
>> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>>
>> We would see the exception when in one executor there are two task worker
>> threads assigned the same Topic+Partition, but a different set of offsets.
>>
>> They would both get the same CachedKafkaConsumer, and whichever task
>> thread went first would seek and poll for all the records, and at the same
>> time the second thread would try to seek to its offset but fail because it
>> is unable to acquire the lock.
>>
>> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
>> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>>
>> Time1 E0 Task0 - Seeks and starts to poll
>> Time1 E0 Task1 - Attempts to seek, but fails
>>
>> Here are some relevant logs:
>> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
>> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
>> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204414 requested 4394204414
>> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204414 requested 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
>> Initial fetch for spark-executor-consumer test-topic 2 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG
>> CachedKafkaConsumer: Seeking to test-topic-2 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager:
>> Putting block rdd_199_2 failed due to an exception
>> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
>> rdd_199_2 could not be removed as it was not found on disk or in memory
>> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
>> in task 49.0 in stage 45.0 (TID 3201)
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>> at org.apache.kafka.clients.consumer.KafkaConsumer.
>> acquire(KafkaConsumer.java:1431)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.seek(
>> KafkaConsumer.java:1132)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
>> seek(CachedKafkaConsumer.scala:95)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
>> get(CachedKafkaConsumer.scala:69)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
>> KafkaRDD.scala:227)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
>> KafkaRDD.scala:193)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(
>> MemoryStore.scala:360)
>> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
>> BlockManager.scala:951)
>> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
>> BlockManager.scala:926)
>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
>> at org.apache.spark.storage.BlockManager.doPutIterator(
>> BlockManager.scala:926)
>> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
>> BlockManager.scala:670)
>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:79)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:47)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Polled [test-topic-2]  8237
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204415 requested 4394204415
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204416 requested 4394204416
>> ...
>>
>> It looks like when WindowedDStream does the getOrCompute call its
>> computing all the sets of of offsets it needs and tries to farm out the
>> work in parallel. So each available worker task gets each set of offsets
>> that need to be read.
>>
>> After realizing what was going on I tested four states:
>>
>>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>>       - No Exceptions
>>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>>       - ConcurrentModificationException
>>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>>       - ConcurrentModificationException
>>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>>       - ConcurrentModificationException
>>
>>
>> I'm not sure what the best solution to this is if we want to be able to
>> have N tasks threads read from the same TopicPartition to increase
>> parallelization. You could possibly allow N CachedKafkaConsumers for the
>> same TopicPartition.
>>
>> Any thoughts on this?
>>
>> Thanks,
>> Kalvin
>>
>>
>>
>>
>>
>>

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by Kalvin Chau <ka...@gmail.com>.
Here is the minimal code example where I was able to replicate:
Batch interval is set to 2 to get the exceptions to happen more often.

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> brokers,
  "key.deserializer" -> classOf[KafkaAvroDeserializer],
  "value.deserializer" -> classOf[KafkaAvroDeserializer],
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "group.id" -> groupId,
  "schema.registry.url" -> schemaRegistryUrl,
  "auto.offset.reset" -> offset
)

val inputStream = KafkaUtils.createDirectStream[Object, Object](
  ssc,
  PreferConsistent,
  Subscribe[Object, Object]
    (kafkaTopic, kafkaParams)
)

val windowStream = inputStream.map(_.toString).window(Seconds(180), Seconds(30))

windowStream.foreachRDD{
  rdd => {
    val filtered = rdd.filter(_.contains("idb"))

    filtered.foreach(
      message => {
        var i = 0
        if (i == 0) {
          logger.info(message)
          i = i + 1
        }
      }
    )
  }
}


On Wed, Jan 11, 2017 at 4:04 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> Could you post your codes, please?
>
> On Wed, Jan 11, 2017 at 3:53 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> "spark.speculation" is not set, so it would be whatever the default is.
>
>
> On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Or do you enable "spark.speculation"? If not, Spark Streaming should not
> launch two tasks using the same TopicPartition.
>
> On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> I have not modified that configuration setting, and that doesn't seem to
> be documented anywhere.
>
> Does the Kafka 0.10 require the number of cores on an executor be set to
> 1? I didn't see that documented anywhere either.
>
> On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10
> connector requires it must be 1.
>
> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> I'm not re-using any InputDStreams actually, this is one InputDStream that
> has a window applied to it.
>  Then when Spark creates and assigns tasks to read from the Topic, one
> executor gets assigned two tasks to read from the same TopicPartition, and
> uses the same CachedKafkaConsumer to read from the TopicPartition causing
> the ConcurrentModificationException in one of the worker threads.
>
> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> I think you may reuse the kafka DStream (the DStream returned by
> createDirectStream). If you need to read from the same Kafka source, you
> need to create another DStream.
>
> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> Hi,
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been
> working through debugging this issue and after looking through some of the
> spark source code I think this is a bug.
>
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>
> We would see the exception when in one executor there are two task worker
> threads assigned the same Topic+Partition, but a different set of offsets.
>
> They would both get the same CachedKafkaConsumer, and whichever task
> thread went first would seek and poll for all the records, and at the same
> time the second thread would try to seek to its offset but fail because it
> is unable to acquire the lock.
>
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
>
> Here are some relevant logs:
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
> in task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Polled [test-topic-2]  8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested
> 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested
> 4394204416
> ...
>
> It looks like when WindowedDStream does the getOrCompute call its
> computing all the sets of of offsets it needs and tries to farm out the
> work in parallel. So each available worker task gets each set of offsets
> that need to be read.
>
> After realizing what was going on I tested four states:
>
>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>       - No Exceptions
>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>
>
> I'm not sure what the best solution to this is if we want to be able to
> have N tasks threads read from the same TopicPartition to increase
> parallelization. You could possibly allow N CachedKafkaConsumers for the
> same TopicPartition.
>
> Any thoughts on this?
>
> Thanks,
> Kalvin
>
>
>
>
>
>

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Could you post your codes, please?

On Wed, Jan 11, 2017 at 3:53 PM, Kalvin Chau <ka...@gmail.com> wrote:

> "spark.speculation" is not set, so it would be whatever the default is.
>
>
> On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Or do you enable "spark.speculation"? If not, Spark Streaming should not
>> launch two tasks using the same TopicPartition.
>>
>> On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <ka...@gmail.com>
>> wrote:
>>
>> I have not modified that configuration setting, and that doesn't seem to
>> be documented anywhere.
>>
>> Does the Kafka 0.10 require the number of cores on an executor be set to
>> 1? I didn't see that documented anywhere either.
>>
>> On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka
>> 0.10 connector requires it must be 1.
>>
>> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <ka...@gmail.com>
>> wrote:
>>
>> I'm not re-using any InputDStreams actually, this is one InputDStream
>> that has a window applied to it.
>>  Then when Spark creates and assigns tasks to read from the Topic, one
>> executor gets assigned two tasks to read from the same TopicPartition, and
>> uses the same CachedKafkaConsumer to read from the TopicPartition causing
>> the ConcurrentModificationException in one of the worker threads.
>>
>> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> I think you may reuse the kafka DStream (the DStream returned by
>> createDirectStream). If you need to read from the same Kafka source, you
>> need to create another DStream.
>>
>> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> We've been running into ConcurrentModificationExcpetions "KafkaConsumer
>> is not safe for multi-threaded access" with the CachedKafkaConsumer. I've
>> been working through debugging this issue and after looking through some of
>> the spark source code I think this is a bug.
>>
>> Our set up is:
>> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
>> Spark-Streaming-Kafka-010
>> spark.executor.cores 1
>> spark.mesos.extra.cores 1
>>
>> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>>
>> We would see the exception when in one executor there are two task worker
>> threads assigned the same Topic+Partition, but a different set of offsets.
>>
>> They would both get the same CachedKafkaConsumer, and whichever task
>> thread went first would seek and poll for all the records, and at the same
>> time the second thread would try to seek to its offset but fail because it
>> is unable to acquire the lock.
>>
>> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
>> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>>
>> Time1 E0 Task0 - Seeks and starts to poll
>> Time1 E0 Task1 - Attempts to seek, but fails
>>
>> Here are some relevant logs:
>> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
>> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
>> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204414 requested 4394204414
>> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204414 requested 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
>> Initial fetch for spark-executor-consumer test-topic 2 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG
>> CachedKafkaConsumer: Seeking to test-topic-2 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager:
>> Putting block rdd_199_2 failed due to an exception
>> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
>> rdd_199_2 could not be removed as it was not found on disk or in memory
>> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
>> in task 49.0 in stage 45.0 (TID 3201)
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>> at org.apache.kafka.clients.consumer.KafkaConsumer.
>> acquire(KafkaConsumer.java:1431)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.seek(
>> KafkaConsumer.java:1132)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
>> seek(CachedKafkaConsumer.scala:95)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
>> get(CachedKafkaConsumer.scala:69)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
>> KafkaRDD.scala:227)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
>> KafkaRDD.scala:193)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(
>> MemoryStore.scala:360)
>> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
>> BlockManager.scala:951)
>> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
>> BlockManager.scala:926)
>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
>> at org.apache.spark.storage.BlockManager.doPutIterator(
>> BlockManager.scala:926)
>> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
>> BlockManager.scala:670)
>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:79)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:47)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Polled [test-topic-2]  8237
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204415 requested 4394204415
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204416 requested 4394204416
>> ...
>>
>> It looks like when WindowedDStream does the getOrCompute call its
>> computing all the sets of of offsets it needs and tries to farm out the
>> work in parallel. So each available worker task gets each set of offsets
>> that need to be read.
>>
>> After realizing what was going on I tested four states:
>>
>>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>>       - No Exceptions
>>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>>       - ConcurrentModificationException
>>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>>       - ConcurrentModificationException
>>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>>       - ConcurrentModificationException
>>
>>
>> I'm not sure what the best solution to this is if we want to be able to
>> have N tasks threads read from the same TopicPartition to increase
>> parallelization. You could possibly allow N CachedKafkaConsumers for the
>> same TopicPartition.
>>
>> Any thoughts on this?
>>
>> Thanks,
>> Kalvin
>>
>>
>>
>>
>>

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by Kalvin Chau <ka...@gmail.com>.
"spark.speculation" is not set, so it would be whatever the default is.


On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> Or do you enable "spark.speculation"? If not, Spark Streaming should not
> launch two tasks using the same TopicPartition.
>
> On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> I have not modified that configuration setting, and that doesn't seem to
> be documented anywhere.
>
> Does the Kafka 0.10 require the number of cores on an executor be set to
> 1? I didn't see that documented anywhere either.
>
> On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10
> connector requires it must be 1.
>
> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> I'm not re-using any InputDStreams actually, this is one InputDStream that
> has a window applied to it.
>  Then when Spark creates and assigns tasks to read from the Topic, one
> executor gets assigned two tasks to read from the same TopicPartition, and
> uses the same CachedKafkaConsumer to read from the TopicPartition causing
> the ConcurrentModificationException in one of the worker threads.
>
> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> I think you may reuse the kafka DStream (the DStream returned by
> createDirectStream). If you need to read from the same Kafka source, you
> need to create another DStream.
>
> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> Hi,
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been
> working through debugging this issue and after looking through some of the
> spark source code I think this is a bug.
>
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>
> We would see the exception when in one executor there are two task worker
> threads assigned the same Topic+Partition, but a different set of offsets.
>
> They would both get the same CachedKafkaConsumer, and whichever task
> thread went first would seek and poll for all the records, and at the same
> time the second thread would try to seek to its offset but fail because it
> is unable to acquire the lock.
>
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
>
> Here are some relevant logs:
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
> in task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Polled [test-topic-2]  8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested
> 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested
> 4394204416
> ...
>
> It looks like when WindowedDStream does the getOrCompute call its
> computing all the sets of of offsets it needs and tries to farm out the
> work in parallel. So each available worker task gets each set of offsets
> that need to be read.
>
> After realizing what was going on I tested four states:
>
>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>       - No Exceptions
>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>
>
> I'm not sure what the best solution to this is if we want to be able to
> have N tasks threads read from the same TopicPartition to increase
> parallelization. You could possibly allow N CachedKafkaConsumers for the
> same TopicPartition.
>
> Any thoughts on this?
>
> Thanks,
> Kalvin
>
>
>
>
>

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Or do you enable "spark.speculation"? If not, Spark Streaming should not
launch two tasks using the same TopicPartition.

On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <ka...@gmail.com> wrote:

> I have not modified that configuration setting, and that doesn't seem to
> be documented anywhere.
>
> Does the Kafka 0.10 require the number of cores on an executor be set to
> 1? I didn't see that documented anywhere either.
>
> On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka
>> 0.10 connector requires it must be 1.
>>
>> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <ka...@gmail.com>
>> wrote:
>>
>> I'm not re-using any InputDStreams actually, this is one InputDStream
>> that has a window applied to it.
>>  Then when Spark creates and assigns tasks to read from the Topic, one
>> executor gets assigned two tasks to read from the same TopicPartition, and
>> uses the same CachedKafkaConsumer to read from the TopicPartition causing
>> the ConcurrentModificationException in one of the worker threads.
>>
>> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>> I think you may reuse the kafka DStream (the DStream returned by
>> createDirectStream). If you need to read from the same Kafka source, you
>> need to create another DStream.
>>
>> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> We've been running into ConcurrentModificationExcpetions "KafkaConsumer
>> is not safe for multi-threaded access" with the CachedKafkaConsumer. I've
>> been working through debugging this issue and after looking through some of
>> the spark source code I think this is a bug.
>>
>> Our set up is:
>> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
>> Spark-Streaming-Kafka-010
>> spark.executor.cores 1
>> spark.mesos.extra.cores 1
>>
>> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>>
>> We would see the exception when in one executor there are two task worker
>> threads assigned the same Topic+Partition, but a different set of offsets.
>>
>> They would both get the same CachedKafkaConsumer, and whichever task
>> thread went first would seek and poll for all the records, and at the same
>> time the second thread would try to seek to its offset but fail because it
>> is unable to acquire the lock.
>>
>> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
>> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>>
>> Time1 E0 Task0 - Seeks and starts to poll
>> Time1 E0 Task1 - Attempts to seek, but fails
>>
>> Here are some relevant logs:
>> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
>> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
>> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204414 requested 4394204414
>> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204414 requested 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
>> Initial fetch for spark-executor-consumer test-topic 2 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG
>> CachedKafkaConsumer: Seeking to test-topic-2 4394238058
>> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager:
>> Putting block rdd_199_2 failed due to an exception
>> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
>> rdd_199_2 could not be removed as it was not found on disk or in memory
>> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
>> in task 49.0 in stage 45.0 (TID 3201)
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>> at org.apache.kafka.clients.consumer.KafkaConsumer.
>> acquire(KafkaConsumer.java:1431)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.seek(
>> KafkaConsumer.java:1132)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
>> seek(CachedKafkaConsumer.scala:95)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
>> get(CachedKafkaConsumer.scala:69)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
>> KafkaRDD.scala:227)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
>> KafkaRDD.scala:193)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(
>> MemoryStore.scala:360)
>> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
>> BlockManager.scala:951)
>> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
>> BlockManager.scala:926)
>> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
>> at org.apache.spark.storage.BlockManager.doPutIterator(
>> BlockManager.scala:926)
>> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
>> BlockManager.scala:670)
>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:79)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:47)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Polled [test-topic-2]  8237
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204415 requested 4394204415
>> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG
>> CachedKafkaConsumer: Get spark-executor-consumer test-topic 2 nextOffset
>> 4394204416 requested 4394204416
>> ...
>>
>> It looks like when WindowedDStream does the getOrCompute call its
>> computing all the sets of of offsets it needs and tries to farm out the
>> work in parallel. So each available worker task gets each set of offsets
>> that need to be read.
>>
>> After realizing what was going on I tested four states:
>>
>>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>>       - No Exceptions
>>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>>       - ConcurrentModificationException
>>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>>       - ConcurrentModificationException
>>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>>       - ConcurrentModificationException
>>
>>
>> I'm not sure what the best solution to this is if we want to be able to
>> have N tasks threads read from the same TopicPartition to increase
>> parallelization. You could possibly allow N CachedKafkaConsumers for the
>> same TopicPartition.
>>
>> Any thoughts on this?
>>
>> Thanks,
>> Kalvin
>>
>>
>>
>>

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by Kalvin Chau <ka...@gmail.com>.
I have not modified that configuration setting, and that doesn't seem to be
documented anywhere.

Does the Kafka 0.10 require the number of cores on an executor be set to 1?
I didn't see that documented anywhere either.

On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10
> connector requires it must be 1.
>
> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> I'm not re-using any InputDStreams actually, this is one InputDStream that
> has a window applied to it.
>  Then when Spark creates and assigns tasks to read from the Topic, one
> executor gets assigned two tasks to read from the same TopicPartition, and
> uses the same CachedKafkaConsumer to read from the TopicPartition causing
> the ConcurrentModificationException in one of the worker threads.
>
> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> I think you may reuse the kafka DStream (the DStream returned by
> createDirectStream). If you need to read from the same Kafka source, you
> need to create another DStream.
>
> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> Hi,
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been
> working through debugging this issue and after looking through some of the
> spark source code I think this is a bug.
>
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>
> We would see the exception when in one executor there are two task worker
> threads assigned the same Topic+Partition, but a different set of offsets.
>
> They would both get the same CachedKafkaConsumer, and whichever task
> thread went first would seek and poll for all the records, and at the same
> time the second thread would try to seek to its offset but fail because it
> is unable to acquire the lock.
>
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
>
> Here are some relevant logs:
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
> in task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Polled [test-topic-2]  8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested
> 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested
> 4394204416
> ...
>
> It looks like when WindowedDStream does the getOrCompute call its
> computing all the sets of of offsets it needs and tries to farm out the
> work in parallel. So each available worker task gets each set of offsets
> that need to be read.
>
> After realizing what was going on I tested four states:
>
>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>       - No Exceptions
>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>
>
> I'm not sure what the best solution to this is if we want to be able to
> have N tasks threads read from the same TopicPartition to increase
> parallelization. You could possibly allow N CachedKafkaConsumers for the
> same TopicPartition.
>
> Any thoughts on this?
>
> Thanks,
> Kalvin
>
>
>
>

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10
connector requires it must be 1.

On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <ka...@gmail.com> wrote:

> I'm not re-using any InputDStreams actually, this is one InputDStream that
> has a window applied to it.
>  Then when Spark creates and assigns tasks to read from the Topic, one
> executor gets assigned two tasks to read from the same TopicPartition, and
> uses the same CachedKafkaConsumer to read from the TopicPartition causing
> the ConcurrentModificationException in one of the worker threads.
>
> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
> I think you may reuse the kafka DStream (the DStream returned by
> createDirectStream). If you need to read from the same Kafka source, you
> need to create another DStream.
>
> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com>
> wrote:
>
> Hi,
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer
> is not safe for multi-threaded access" with the CachedKafkaConsumer. I've
> been working through debugging this issue and after looking through some of
> the spark source code I think this is a bug.
>
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>
> We would see the exception when in one executor there are two task worker
> threads assigned the same Topic+Partition, but a different set of offsets.
>
> They would both get the same CachedKafkaConsumer, and whichever task
> thread went first would seek and poll for all the records, and at the same
> time the second thread would try to seek to its offset but fail because it
> is unable to acquire the lock.
>
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
>
> Here are some relevant logs:
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
> in task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> at org.apache.kafka.clients.consumer.KafkaConsumer.seek(
> KafkaConsumer.java:1132)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> seek(CachedKafkaConsumer.scala:95)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:69)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(
> MemoryStore.scala:360)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
> BlockManager.scala:951)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
> BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at org.apache.spark.storage.BlockManager.doPutIterator(
> BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:670)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Polled [test-topic-2]  8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested
> 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested
> 4394204416
> ...
>
> It looks like when WindowedDStream does the getOrCompute call its
> computing all the sets of of offsets it needs and tries to farm out the
> work in parallel. So each available worker task gets each set of offsets
> that need to be read.
>
> After realizing what was going on I tested four states:
>
>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>       - No Exceptions
>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>
>
> I'm not sure what the best solution to this is if we want to be able to
> have N tasks threads read from the same TopicPartition to increase
> parallelization. You could possibly allow N CachedKafkaConsumers for the
> same TopicPartition.
>
> Any thoughts on this?
>
> Thanks,
> Kalvin
>
>
>

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by Kalvin Chau <ka...@gmail.com>.
I'm not re-using any InputDStreams actually, this is one InputDStream that
has a window applied to it.
 Then when Spark creates and assigns tasks to read from the Topic, one
executor gets assigned two tasks to read from the same TopicPartition, and
uses the same CachedKafkaConsumer to read from the TopicPartition causing
the ConcurrentModificationException in one of the worker threads.

On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

I think you may reuse the kafka DStream (the DStream returned by
createDirectStream). If you need to read from the same Kafka source, you
need to create another DStream.

On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com> wrote:

Hi,

We've been running into ConcurrentModificationExcpetions "KafkaConsumer is
not safe for multi-threaded access" with the CachedKafkaConsumer. I've been
working through debugging this issue and after looking through some of the
spark source code I think this is a bug.

Our set up is:
Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
Spark-Streaming-Kafka-010
spark.executor.cores 1
spark.mesos.extra.cores 1

Batch interval: 10s, window interval: 180s, and slide interval: 30s

We would see the exception when in one executor there are two task worker
threads assigned the same Topic+Partition, but a different set of offsets.

They would both get the same CachedKafkaConsumer, and whichever task thread
went first would seek and poll for all the records, and at the same time
the second thread would try to seek to its offset but fail because it is
unable to acquire the lock.

Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z

Time1 E0 Task0 - Seeks and starts to poll
Time1 E0 Task1 - Attempts to seek, but fails

Here are some relevant logs:
17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
topic test-topic, partition 2 offsets 4394204414 -> 4394238058
17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
topic test-topic, partition 2 offsets 4394238058 -> 4394257712
17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
4394204414
17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
4394238058
17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
Initial fetch for spark-executor-consumer test-topic 2 4394238058
17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
Seeking to test-topic-2 4394238058
17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
block rdd_199_2 failed due to an exception
17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
rdd_199_2 could not be removed as it was not found on disk or in memory
17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
in task 49.0 in stage 45.0 (TID 3201)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
Polled [test-topic-2]  8237
17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested
4394204415
17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested
4394204416
...

It looks like when WindowedDStream does the getOrCompute call its computing
all the sets of of offsets it needs and tries to farm out the work in
parallel. So each available worker task gets each set of offsets that need
to be read.

After realizing what was going on I tested four states:

   - spark.executor.cores 1 and spark.mesos.extra.cores 0
      - No Exceptions
   - spark.executor.cores 1 and spark.mesos.extra.cores 1
      - ConcurrentModificationException
   - spark.executor.cores 2 and spark.mesos.extra.cores 0
      - ConcurrentModificationException
   - spark.executor.cores 2 and spark.mesos.extra.cores 1
      - ConcurrentModificationException


I'm not sure what the best solution to this is if we want to be able to
have N tasks threads read from the same TopicPartition to increase
parallelization. You could possibly allow N CachedKafkaConsumers for the
same TopicPartition.

Any thoughts on this?

Thanks,
Kalvin

Re: [Streaming] ConcurrentModificationExceptions when Windowing

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
I think you may reuse the kafka DStream (the DStream returned by
createDirectStream). If you need to read from the same Kafka source, you
need to create another DStream.

On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <ka...@gmail.com> wrote:

> Hi,
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer
> is not safe for multi-threaded access" with the CachedKafkaConsumer. I've
> been working through debugging this issue and after looking through some of
> the spark source code I think this is a bug.
>
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>
> We would see the exception when in one executor there are two task worker
> threads assigned the same Topic+Partition, but a different set of offsets.
>
> They would both get the same CachedKafkaConsumer, and whichever task
> thread went first would seek and poll for all the records, and at the same
> time the second thread would try to seek to its offset but fail because it
> is unable to acquire the lock.
>
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
>
> Here are some relevant logs:
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
> in task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> at org.apache.kafka.clients.consumer.KafkaConsumer.seek(
> KafkaConsumer.java:1132)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> seek(CachedKafkaConsumer.scala:95)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:69)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(
> MemoryStore.scala:360)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
> BlockManager.scala:951)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
> BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at org.apache.spark.storage.BlockManager.doPutIterator(
> BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:670)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Polled [test-topic-2]  8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested
> 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested
> 4394204416
> ...
>
> It looks like when WindowedDStream does the getOrCompute call its
> computing all the sets of of offsets it needs and tries to farm out the
> work in parallel. So each available worker task gets each set of offsets
> that need to be read.
>
> After realizing what was going on I tested four states:
>
>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>       - No Exceptions
>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>
>
> I'm not sure what the best solution to this is if we want to be able to
> have N tasks threads read from the same TopicPartition to increase
> parallelization. You could possibly allow N CachedKafkaConsumers for the
> same TopicPartition.
>
> Any thoughts on this?
>
> Thanks,
> Kalvin
>
>