You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by wang Wu <fa...@gmail.com> on 2020/06/28 12:43:39 UTC

Concurrency issue with KafkaIO

Hi,
We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple Kafka topics. Our job run fine until it is under heavy load: millions of Kafka messages coming per seconds. The exception look like concurrency issue. Is it a known bug in Beam or some Spark configuration we could do to avoid?
Our code roughly look like this
For KafkaIO
input
        .getPipeline()
        .apply(
            "ReadFromKafka",
            KafkaIO.readBytes()
                .withBootstrapServers(XXX)
                .withTopics(YYY)
                .withConsumerConfigUpdates(
                    ImmutableMap.of(
                        "group.id",
                        "ZZZ"))
                .withReadCommitted()
                .commitOffsetsInFinalize())
        .apply(
            "AAA",
            ParDo.of(
                    KafkaRecordToFeatureRowDoFn.newBuilder()
                        .setSuccessTag(getSuccessTag())
                        .setFailureTag(getFailureTag())
                        .build())
                .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
Running Pipeline with Spark Runner:
PipelineResult result = pipeline.run();
result.waitUntilFinish();

--
Exceptions:

20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to exception java.lang.IllegalStateException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it was not found on disk or in memory
20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203)
java.lang.IllegalStateException
	at java.util.ArrayList$Itr.remove(ArrayList.java:872)
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start reading from default.
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.util.ConcurrentModificationException.
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to exception java.util.ConcurrentModificationException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as it was not found on disk or in memory
20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.util.ConcurrentModificationException.
20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202)
java.util.ConcurrentModificationException
	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)


Re: Concurrency issue with KafkaIO

Posted by Alexey Romanenko <ar...@gmail.com>.
Tbh, I don’t see why it can be improved if these two pipelines will use the same amount of slots and resources. 

> On 3 Jul 2020, at 11:49, wang Wu <fa...@gmail.com> wrote:
> 
> One question: We have 5 topics and we adding them all to KafkaIO. Will it help to improve the throughput of pipeline if:
> We add only 1 topic to KafkaIO and create 5 PCollection (unbounded) . Each collection will go through the same transform and write to the same final sink.
> 
> Regards
> Dinh
> 
>> On 30 Jun BE 2563, at 23:53, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>> 
>> We encountered similar exception with KafkaUnboundedReader. By similarity I mean it start from
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint
>> 
>> And it ends at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance
>> Just another type of concurrency bug.
>> 
>> I am sorry for the long stack trace
>> My question is:
>> 1. Is the code of KafkaUnboundedReader thread-safe? Especially this code:
>> curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
>> 2. What really happened behind the scene? I mean which parallel tasks share the same reader? How it relates to MicrobatchSource?
>> 
>> Regards
>> Dinh
>> 
>> 20/06/30 21:12:26 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.NoSuchElementException
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:184)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:219)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
>> 	at feast.ingestion.ImportJobOnSpark.runPipeline(ImportJobOnSpark.java:199)
>> 	at feast.ingestion.ImportJobOnSpark.main(ImportJobOnSpark.java:77)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
>> Caused by: java.util.NoSuchElementException
>> 	at java.util.ArrayList$Itr.next(ArrayList.java:862)
>> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418)
>> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:168)
>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
>> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>> 
>> 
>>> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> I don’t think it’s a known issue. Could you tell with version of Beam you use?
>>> 
>>>> On 28 Jun 2020, at 14:43, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple Kafka topics. Our job run fine until it is under heavy load: millions of Kafka messages coming per seconds. The exception look like concurrency issue. Is it a known bug in Beam or some Spark configuration we could do to avoid?
>>>> Our code roughly look like this
>>>> For KafkaIO
>>>> input
>>>>       .getPipeline()
>>>>       .apply(
>>>>           "ReadFromKafka",
>>>>           KafkaIO.readBytes()
>>>>               .withBootstrapServers(XXX)
>>>>               .withTopics(YYY)
>>>>               .withConsumerConfigUpdates(
>>>>                   ImmutableMap.of(
>>>>                       "group.id <http://group.id/>",
>>>>                       "ZZZ"))
>>>>               .withReadCommitted()
>>>>               .commitOffsetsInFinalize())
>>>>       .apply(
>>>>           "AAA",
>>>>           ParDo.of(
>>>>                   KafkaRecordToFeatureRowDoFn.newBuilder()
>>>>                       .setSuccessTag(getSuccessTag())
>>>>                       .setFailureTag(getFailureTag())
>>>>                       .build())
>>>>               .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
>>>> Running Pipeline with Spark Runner:
>>>> PipelineResult result = pipeline.run();
>>>> result.waitUntilFinish();
>>>> 
>>>> --
>>>> Exceptions:
>>>> 
>>>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203)
>>>> java.lang.IllegalStateException
>>>> 	at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>>>> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>>>> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>>>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>>>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>>>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>>>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
>>>> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>>> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
>>>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>>>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start reading from default.
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.util.ConcurrentModificationException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to exception java.util.ConcurrentModificationException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.util.ConcurrentModificationException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202)
>>>> java.util.ConcurrentModificationException
>>>> 	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>>>> 
>>> 
>> 
> 


Re: Concurrency issue with KafkaIO

Posted by wang Wu <fa...@gmail.com>.
One question: We have 5 topics and we adding them all to KafkaIO. Will it help to improve the throughput of pipeline if:
We add only 1 topic to KafkaIO and create 5 PCollection (unbounded) . Each collection will go through the same transform and write to the same final sink.

Regards
Dinh

> On 30 Jun BE 2563, at 23:53, wang Wu <fa...@gmail.com> wrote:
> 
> We encountered similar exception with KafkaUnboundedReader. By similarity I mean it start from
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint
> 
> And it ends at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance
> Just another type of concurrency bug.
> 
> I am sorry for the long stack trace
> My question is:
> 1. Is the code of KafkaUnboundedReader thread-safe? Especially this code:
> curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
> 2. What really happened behind the scene? I mean which parallel tasks share the same reader? How it relates to MicrobatchSource?
> 
> Regards
> Dinh
> 
> 20/06/30 21:12:26 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.NoSuchElementException
> 	at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
> 	at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:184)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:219)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
> 	at feast.ingestion.ImportJobOnSpark.runPipeline(ImportJobOnSpark.java:199)
> 	at feast.ingestion.ImportJobOnSpark.main(ImportJobOnSpark.java:77)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
> Caused by: java.util.NoSuchElementException
> 	at java.util.ArrayList$Itr.next(ArrayList.java:862)
> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418)
> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:168)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 
> 
>> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> 
>> I don’t think it’s a known issue. Could you tell with version of Beam you use?
>> 
>>> On 28 Jun 2020, at 14:43, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple Kafka topics. Our job run fine until it is under heavy load: millions of Kafka messages coming per seconds. The exception look like concurrency issue. Is it a known bug in Beam or some Spark configuration we could do to avoid?
>>> Our code roughly look like this
>>> For KafkaIO
>>> input
>>>       .getPipeline()
>>>       .apply(
>>>           "ReadFromKafka",
>>>           KafkaIO.readBytes()
>>>               .withBootstrapServers(XXX)
>>>               .withTopics(YYY)
>>>               .withConsumerConfigUpdates(
>>>                   ImmutableMap.of(
>>>                       "group.id <http://group.id/>",
>>>                       "ZZZ"))
>>>               .withReadCommitted()
>>>               .commitOffsetsInFinalize())
>>>       .apply(
>>>           "AAA",
>>>           ParDo.of(
>>>                   KafkaRecordToFeatureRowDoFn.newBuilder()
>>>                       .setSuccessTag(getSuccessTag())
>>>                       .setFailureTag(getFailureTag())
>>>                       .build())
>>>               .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
>>> Running Pipeline with Spark Runner:
>>> PipelineResult result = pipeline.run();
>>> result.waitUntilFinish();
>>> 
>>> --
>>> Exceptions:
>>> 
>>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203)
>>> java.lang.IllegalStateException
>>> 	at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>>> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>>> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
>>> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> 	at java.lang.Thread.run(Thread.java:748)
>>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
>>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start reading from default.
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.util.ConcurrentModificationException.
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to exception java.util.ConcurrentModificationException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.util.ConcurrentModificationException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202)
>>> java.util.ConcurrentModificationException
>>> 	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>>> 
>> 
> 


Re: Concurrency issue with KafkaIO

Posted by wang Wu <fa...@gmail.com>.
Thank you for the information. Here is our Kafka client version:

[INFO] +- org.apache.kafka:kafka-clients:jar:2.3.0:compile
[INFO] |  +- com.github.luben:zstd-jni:jar:1.4.0-1:compile
[INFO] |  +- org.lz4:lz4-java:jar:1.6.0:compile
[INFO] |  \- org.xerial.snappy:snappy-java:jar:1.1.7.3:compile

I agree that there is something to do with tuning Spark streaming. I double number of executors, adding options such as:

driverMemory: 2G
executorMemory: 4G
driverCores: 2
executorCores: 4
numExecutors: 8
spark.driver.me <http://spark.driver.me/>moryOverhead=2048
spark.executor.memoryOverhead=4096
spark.streaming.blockInterval = 5000 (default value is 500)
I haven’t experienced yet my job get killed with the Concurrency bug. Just the lag of streaming is increasing and that I will have to fix by more fine-tuning.

Regards
Dinh

> On 3 Jul BE 2563, at 00:46, Alexey Romanenko <ar...@gmail.com> wrote:
> 
> KafkaUnboundedReader is not thread-safe and, maybe I’m wrong,  but I don’t think it’s supposed to be so since every KafkaUnboundedReader is supposed to read from every split, represented by KafkaUnboundedSource, independently. 
> 
> Though, in KafkaIO case, if total number of splits is less than number of all topic partitions to read, then one or more readers will read from more than one partition. However it will be done in cycle, not in parallel. 
> 
> To read messages and offsets, KafkaUnboundedReader uses two Kafka consumers, spawn in different threads. So we need to check it there is no issue under high load with that.
> 
> Btw, which Kafka client version do you use?
> 
>> On 30 Jun 2020, at 18:53, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>> 
>> We encountered similar exception with KafkaUnboundedReader. By similarity I mean it start from
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint
>> 
>> And it ends at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance
>> Just another type of concurrency bug.
>> 
>> I am sorry for the long stack trace
>> My question is:
>> 1. Is the code of KafkaUnboundedReader thread-safe? Especially this code:
>> curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
>> 2. What really happened behind the scene? I mean which parallel tasks share the same reader? How it relates to MicrobatchSource?
>> 
>> Regards
>> Dinh
>> 
>> 20/06/30 21:12:26 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.NoSuchElementException
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:184)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:219)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
>> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
>> 	at feast.ingestion.ImportJobOnSpark.runPipeline(ImportJobOnSpark.java:199)
>> 	at feast.ingestion.ImportJobOnSpark.main(ImportJobOnSpark.java:77)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
>> Caused by: java.util.NoSuchElementException
>> 	at java.util.ArrayList$Itr.next(ArrayList.java:862)
>> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418)
>> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:168)
>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
>> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>> 
>> 
>>> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> I don’t think it’s a known issue. Could you tell with version of Beam you use?
>>> 
>>>> On 28 Jun 2020, at 14:43, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple Kafka topics. Our job run fine until it is under heavy load: millions of Kafka messages coming per seconds. The exception look like concurrency issue. Is it a known bug in Beam or some Spark configuration we could do to avoid?
>>>> Our code roughly look like this
>>>> For KafkaIO
>>>> input
>>>>       .getPipeline()
>>>>       .apply(
>>>>           "ReadFromKafka",
>>>>           KafkaIO.readBytes()
>>>>               .withBootstrapServers(XXX)
>>>>               .withTopics(YYY)
>>>>               .withConsumerConfigUpdates(
>>>>                   ImmutableMap.of(
>>>>                       "group.id <http://group.id/>",
>>>>                       "ZZZ"))
>>>>               .withReadCommitted()
>>>>               .commitOffsetsInFinalize())
>>>>       .apply(
>>>>           "AAA",
>>>>           ParDo.of(
>>>>                   KafkaRecordToFeatureRowDoFn.newBuilder()
>>>>                       .setSuccessTag(getSuccessTag())
>>>>                       .setFailureTag(getFailureTag())
>>>>                       .build())
>>>>               .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
>>>> Running Pipeline with Spark Runner:
>>>> PipelineResult result = pipeline.run();
>>>> result.waitUntilFinish();
>>>> 
>>>> --
>>>> Exceptions:
>>>> 
>>>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to exception java.lang.IllegalStateException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203)
>>>> java.lang.IllegalStateException
>>>> 	at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>>>> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>>>> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>>>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>>>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>>>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>>>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
>>>> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>>> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> 	at java.lang.Thread.run(Thread.java:748)
>>>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
>>>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>>>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start reading from default.
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.util.ConcurrentModificationException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to exception java.util.ConcurrentModificationException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.util.ConcurrentModificationException.
>>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202)
>>>> java.util.ConcurrentModificationException
>>>> 	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>>>> 
>>> 
>> 
> 


Re: Concurrency issue with KafkaIO

Posted by Alexey Romanenko <ar...@gmail.com>.
KafkaUnboundedReader is not thread-safe and, maybe I’m wrong,  but I don’t think it’s supposed to be so since every KafkaUnboundedReader is supposed to read from every split, represented by KafkaUnboundedSource, independently. 

Though, in KafkaIO case, if total number of splits is less than number of all topic partitions to read, then one or more readers will read from more than one partition. However it will be done in cycle, not in parallel. 

To read messages and offsets, KafkaUnboundedReader uses two Kafka consumers, spawn in different threads. So we need to check it there is no issue under high load with that.

Btw, which Kafka client version do you use?

> On 30 Jun 2020, at 18:53, wang Wu <fa...@gmail.com> wrote:
> 
> We encountered similar exception with KafkaUnboundedReader. By similarity I mean it start from
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint
> 
> And it ends at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance
> Just another type of concurrency bug.
> 
> I am sorry for the long stack trace
> My question is:
> 1. Is the code of KafkaUnboundedReader thread-safe? Especially this code:
> curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
> 2. What really happened behind the scene? I mean which parallel tasks share the same reader? How it relates to MicrobatchSource?
> 
> Regards
> Dinh
> 
> 20/06/30 21:12:26 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.NoSuchElementException
> 	at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
> 	at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:184)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:219)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
> 	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
> 	at feast.ingestion.ImportJobOnSpark.runPipeline(ImportJobOnSpark.java:199)
> 	at feast.ingestion.ImportJobOnSpark.main(ImportJobOnSpark.java:77)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
> Caused by: java.util.NoSuchElementException
> 	at java.util.ArrayList$Itr.next(ArrayList.java:862)
> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418)
> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:168)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 
> 
>> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> 
>> I don’t think it’s a known issue. Could you tell with version of Beam you use?
>> 
>>> On 28 Jun 2020, at 14:43, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple Kafka topics. Our job run fine until it is under heavy load: millions of Kafka messages coming per seconds. The exception look like concurrency issue. Is it a known bug in Beam or some Spark configuration we could do to avoid?
>>> Our code roughly look like this
>>> For KafkaIO
>>> input
>>>       .getPipeline()
>>>       .apply(
>>>           "ReadFromKafka",
>>>           KafkaIO.readBytes()
>>>               .withBootstrapServers(XXX)
>>>               .withTopics(YYY)
>>>               .withConsumerConfigUpdates(
>>>                   ImmutableMap.of(
>>>                       "group.id <http://group.id/>",
>>>                       "ZZZ"))
>>>               .withReadCommitted()
>>>               .commitOffsetsInFinalize())
>>>       .apply(
>>>           "AAA",
>>>           ParDo.of(
>>>                   KafkaRecordToFeatureRowDoFn.newBuilder()
>>>                       .setSuccessTag(getSuccessTag())
>>>                       .setFailureTag(getFailureTag())
>>>                       .build())
>>>               .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
>>> Running Pipeline with Spark Runner:
>>> PipelineResult result = pipeline.run();
>>> result.waitUntilFinish();
>>> 
>>> --
>>> Exceptions:
>>> 
>>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to exception java.lang.IllegalStateException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203)
>>> java.lang.IllegalStateException
>>> 	at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>>> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>>> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
>>> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> 	at java.lang.Thread.run(Thread.java:748)
>>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
>>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start reading from default.
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.util.ConcurrentModificationException.
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to exception java.util.ConcurrentModificationException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.util.ConcurrentModificationException.
>>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202)
>>> java.util.ConcurrentModificationException
>>> 	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>>> 
>> 
> 


Re: Concurrency issue with KafkaIO

Posted by wang Wu <fa...@gmail.com>.
We encountered similar exception with KafkaUnboundedReader. By similarity I mean it start from
org.apache.spark.rdd.RDD.computeOrReadCheckpoint

And it ends at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance
Just another type of concurrency bug.

I am sorry for the long stack trace
My question is:
1. Is the code of KafkaUnboundedReader thread-safe? Especially this code:
curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
2. What really happened behind the scene? I mean which parallel tasks share the same reader? How it relates to MicrobatchSource?

Regards
Dinh

20/06/30 21:12:26 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.NoSuchElementException
	at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
	at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
	at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:184)
	at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:219)
	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
	at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
	at feast.ingestion.ImportJobOnSpark.runPipeline(ImportJobOnSpark.java:199)
	at feast.ingestion.ImportJobOnSpark.main(ImportJobOnSpark.java:77)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
Caused by: java.util.NoSuchElementException
	at java.util.ArrayList$Itr.next(ArrayList.java:862)
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418)
	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:168)
	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)


> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <ar...@gmail.com> wrote:
> 
> I don’t think it’s a known issue. Could you tell with version of Beam you use?
> 
>> On 28 Jun 2020, at 14:43, wang Wu <fa...@gmail.com> wrote:
>> 
>> Hi,
>> We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple Kafka topics. Our job run fine until it is under heavy load: millions of Kafka messages coming per seconds. The exception look like concurrency issue. Is it a known bug in Beam or some Spark configuration we could do to avoid?
>> Our code roughly look like this
>> For KafkaIO
>> input
>>       .getPipeline()
>>       .apply(
>>           "ReadFromKafka",
>>           KafkaIO.readBytes()
>>               .withBootstrapServers(XXX)
>>               .withTopics(YYY)
>>               .withConsumerConfigUpdates(
>>                   ImmutableMap.of(
>>                       "group.id",
>>                       "ZZZ"))
>>               .withReadCommitted()
>>               .commitOffsetsInFinalize())
>>       .apply(
>>           "AAA",
>>           ParDo.of(
>>                   KafkaRecordToFeatureRowDoFn.newBuilder()
>>                       .setSuccessTag(getSuccessTag())
>>                       .setFailureTag(getFailureTag())
>>                       .build())
>>               .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
>> Running Pipeline with Spark Runner:
>> PipelineResult result = pipeline.run();
>> result.waitUntilFinish();
>> 
>> --
>> Exceptions:
>> 
>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203)
>> java.lang.IllegalStateException
>> 	at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
>> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> 	at java.lang.Thread.run(Thread.java:748)
>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start reading from default.
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.util.ConcurrentModificationException.
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to exception java.util.ConcurrentModificationException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.util.ConcurrentModificationException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202)
>> java.util.ConcurrentModificationException
>> 	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>> 
> 


Re: Concurrency issue with KafkaIO

Posted by wang Wu <fa...@gmail.com>.
Hi,

We are using version 2.16.0. More about our dependencies:

+- org.apache.beam:beam-sdks-java-core:jar:2.16.0:compile
[INFO] |  +- org.apache.beam:beam-model-job-management:jar:2.16.0:compile
[INFO] |  +- org.apache.beam:beam-vendor-bytebuddy-1_9_3:jar:0.1:compile
[INFO] |  \- org.tukaani:xz:jar:1.8:compile
[INFO] +- org.apache.beam:beam-sdks-java-io-kafka:jar:2.16.0:compile
[INFO] |  \- org.springframework:spring-expression:jar:5.0.13.RELEASE:compile
[INFO] |     \- org.springframework:spring-core:jar:5.0.13.RELEASE:compile
[INFO] |        \- org.springframework:spring-jcl:jar:5.0.13.RELEASE:compile

+- org.apache.beam:beam-runners-spark:jar:2.16.0:compile
[INFO] |  +- org.apache.beam:beam-runners-core-java:jar:2.16.0:compile
[INFO] |  |  +- org.apache.beam:beam-model-fn-execution:jar:2.16.0:compile
[INFO] |  |  \- org.apache.beam:beam-sdks-java-fn-execution:jar:2.16.0:compile
[INFO] |  \- org.apache.beam:beam-runners-java-fn-execution:jar:2.16.0:compile
[INFO] |     \- org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:jar:2.16.0:compile

When building Kafka input source, we subscribe to multiple topics, i.e. .withTopics(list of multiple topics). At a second thought, I think that it might be the memory issue. Before the job get killed, I think at some point I see Spark log saying that the executor is out of memory. Let me observe more and get back.

Regards
Din 

> On 30 Jun BE 2563, at 00:26, Alexey Romanenko <ar...@gmail.com> wrote:
> 
> I don’t think it’s a known issue. Could you tell with version of Beam you use?
> 
>> On 28 Jun 2020, at 14:43, wang Wu <fa...@gmail.com> wrote:
>> 
>> Hi,
>> We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple Kafka topics. Our job run fine until it is under heavy load: millions of Kafka messages coming per seconds. The exception look like concurrency issue. Is it a known bug in Beam or some Spark configuration we could do to avoid?
>> Our code roughly look like this
>> For KafkaIO
>> input
>>       .getPipeline()
>>       .apply(
>>           "ReadFromKafka",
>>           KafkaIO.readBytes()
>>               .withBootstrapServers(XXX)
>>               .withTopics(YYY)
>>               .withConsumerConfigUpdates(
>>                   ImmutableMap.of(
>>                       "group.id",
>>                       "ZZZ"))
>>               .withReadCommitted()
>>               .commitOffsetsInFinalize())
>>       .apply(
>>           "AAA",
>>           ParDo.of(
>>                   KafkaRecordToFeatureRowDoFn.newBuilder()
>>                       .setSuccessTag(getSuccessTag())
>>                       .setFailureTag(getFailureTag())
>>                       .build())
>>               .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
>> Running Pipeline with Spark Runner:
>> PipelineResult result = pipeline.run();
>> result.waitUntilFinish();
>> 
>> --
>> Exceptions:
>> 
>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203)
>> java.lang.IllegalStateException
>> 	at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
>> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> 	at java.lang.Thread.run(Thread.java:748)
>> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
>> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
>> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
>> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start reading from default.
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.util.ConcurrentModificationException.
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to exception java.util.ConcurrentModificationException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.util.ConcurrentModificationException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202)
>> java.util.ConcurrentModificationException
>> 	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>> 
> 


Re: Concurrency issue with KafkaIO

Posted by Alexey Romanenko <ar...@gmail.com>.
I don’t think it’s a known issue. Could you tell with version of Beam you use?

> On 28 Jun 2020, at 14:43, wang Wu <fa...@gmail.com> wrote:
> 
> Hi,
> We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple Kafka topics. Our job run fine until it is under heavy load: millions of Kafka messages coming per seconds. The exception look like concurrency issue. Is it a known bug in Beam or some Spark configuration we could do to avoid?
> Our code roughly look like this
> For KafkaIO
> input
>        .getPipeline()
>        .apply(
>            "ReadFromKafka",
>            KafkaIO.readBytes()
>                .withBootstrapServers(XXX)
>                .withTopics(YYY)
>                .withConsumerConfigUpdates(
>                    ImmutableMap.of(
>                        "group.id",
>                        "ZZZ"))
>                .withReadCommitted()
>                .commitOffsetsInFinalize())
>        .apply(
>            "AAA",
>            ParDo.of(
>                    KafkaRecordToFeatureRowDoFn.newBuilder()
>                        .setSuccessTag(getSuccessTag())
>                        .setFailureTag(getFailureTag())
>                        .build())
>                .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
> Running Pipeline with Spark Runner:
> PipelineResult result = pipeline.run();
> result.waitUntilFinish();
> 
> --
> Exceptions:
> 
> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as it was not found on disk or in memory
> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 203)
> java.lang.IllegalStateException
> 	at java.util.ArrayList$Itr.remove(ArrayList.java:872)
> 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
> 	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:153)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> 20/06/26 18:45:28 INFO CoarseGrainedExecutorBackend: Got assigned task 204
> 20/06/26 18:45:28 INFO Executor: Running task 3.1 in stage 64.0 (TID 204)
> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks including 0 local blocks and 0 remote blocks
> 20/06/26 18:45:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
> 20/06/26 18:45:28 INFO StateSpecFunctions: No CheckpointMark provided, start reading from default.
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to exception java.util.ConcurrentModificationException.
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_1410_3 failed due to exception java.util.ConcurrentModificationException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_1410_3 could not be removed as it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to exception java.util.ConcurrentModificationException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as it was not found on disk or in memory
> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 61.0 (TID 202)
> java.util.ConcurrentModificationException
> 	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>