You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Bruno Volpato da Cunha (Jira)" <ji...@apache.org> on 2022/05/03 02:11:00 UTC

[jira] [Created] (BEAM-14394) BigQueryIO not thread safe, may fail with ConcurrentModificationException

Bruno Volpato da Cunha created BEAM-14394:
---------------------------------------------

             Summary: BigQueryIO not thread safe, may fail with ConcurrentModificationException
                 Key: BEAM-14394
                 URL: https://issues.apache.org/jira/browse/BEAM-14394
             Project: Beam
          Issue Type: Bug
          Components: io-java-gcp
    Affects Versions: 2.38.0
            Reporter: Bruno Volpato da Cunha


I am using the Java SDK with a ParDo, in which the code has some parallelism (basically, a parallel Stream over an Iterable).

This ParDo is outputting to a side output that is redirected to BigQuery, using Streaming Inserts, with the following parameters:

 
{code:java}
return BigQueryIO.<T>write()
    .to(dynamicDestinations)
    .withMethod(Write.Method.STREAMING_INSERTS)
    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
    .withCustomGcsTempLocation(StaticValueProvider.of(gcsTempLocation))
    .withFormatFunction(formatFunction)
    .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
    .withExtendedErrorInfo(); {code}
 

 

Sometimes, the following error is observed:

 
{code:java}
org.apache.beam.sdk.util.UserCodeException: java.util.ConcurrentModificationException
	at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
	at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
	at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:85)
	at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
	at org.apache.beam.sdk.transforms.FlatMapElements$2.processElement(FlatMapElements.java:156)
	at org.apache.beam.sdk.transforms.FlatMapElements$2$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
	at <<Some Parallel Custom Code.java>>
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at com.totvslabs.mdm.pipeline.cds.writer.CarolStagingParquetWriter$1.lambda$processElement$2(CarolStagingParquetWriter.java:233)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:952)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:926)
	at java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327)
	at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
	at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at com.totvslabs.mdm.pipeline.cds.writer.CarolStagingParquetWriter$1.processElement(CarolStagingParquetWriter.java:237)
	at com.totvslabs.mdm.pipeline.cds.writer.CarolStagingParquetWriter$1$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:268)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:416)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:404)
	at com.totvslabs.mdm.pipeline.cds.component.CarolStagingParse$1.processElement(CarolStagingParse.java:66)
	at com.totvslabs.mdm.pipeline.cds.component.CarolStagingParse$1$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1058)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:373)
	at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
	at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
	at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
	at org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.ConcurrentModificationException
	at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)
	at org.apache.beam.sdk.io.gcp.bigquery.CreateTables$CreateTablesFn.processElement(CreateTables.java:104)
" {code}
 

 

Unfortunately, I was trying to come up with a toy example to post here but could not reproduce it there.

 

When investigating the class CreateTables, it instantiates one _Map<DestinationT, TableDestination> destinations = Maps.newHashMap()_ ** per bundle, and _processElement_ can happen concurrently, following the user's code.

The current implementation for the Map is a HashMap. I believe that either synchronizing the _destinations.computeIfAbsent_ call or simply initializing it with Maps.newConcurrentMap() would be enough.

 

I am open to implementing this change and leaning towards just moving to a ConcurrentHashMap, so just checking if I am not missing any detail.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)