You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Minbo Bae (Jira)" <ji...@apache.org> on 2021/07/15 14:52:00 UTC

[jira] [Updated] (BEAM-12618) Check state failed at IsmSideInputReader.createReadersFromSources

     [ https://issues.apache.org/jira/browse/BEAM-12618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Minbo Bae updated BEAM-12618:
-----------------------------
    Description: 
if a pipeline has a transform with a sideinput and you call `pipeline.run()` two times to create Dataflow jobs, the second job will see a runtime error below in Dataflow worker.
{code}
Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as a reader but was AvroByteReader.
        org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:297)
        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
        org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
        org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
        baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
        org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
Caused by: java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as a reader but was AvroByteReader.
        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
        org.apache.beam.runners.dataflow.worker.IsmSideInputReader.createReadersFromSources(IsmSideInputReader.java:253)
        org.apache.beam.runners.dataflow.worker.IsmSideInputReader.<init>(IsmSideInputReader.java:165)
        org.apache.beam.runners.dataflow.worker.IsmSideInputReader.of(IsmSideInputReader.java:278)
        org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:290)
        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
        org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
        org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
        baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
        org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
        org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
        org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
        org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
        org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
        org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
        org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
        org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
        org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:834)

{code}
You can reproduce this issue with [this sample pipeline|[https://github.com/baeminbo/dataflow-pipelines/tree/master/double-run-sideinput/]].

  was:
if a pipeline has a transform with a sideinput and you call `pipeline.run()` two times to create Dataflow jobs, the second job will see a runtime error below in Dataflow worker.
{code:java}
Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as a reader but was AvroByteReader.Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as a reader but was AvroByteReader. org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:297) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167) org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395) org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100) baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)Caused by: java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as a reader but was AvroByteReader. org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) org.apache.beam.runners.dataflow.worker.IsmSideInputReader.createReadersFromSources(IsmSideInputReader.java:253) org.apache.beam.runners.dataflow.worker.IsmSideInputReader.<init>(IsmSideInputReader.java:165) org.apache.beam.runners.dataflow.worker.IsmSideInputReader.of(IsmSideInputReader.java:278) org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:290) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167) org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395) org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100) baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34) org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143) org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834){code}
You can reproduce this issue with [this sample pipeline|[https://github.com/baeminbo/dataflow-pipelines/tree/master/double-run-sideinput/]].


> Check state failed at IsmSideInputReader.createReadersFromSources
> -----------------------------------------------------------------
>
>                 Key: BEAM-12618
>                 URL: https://issues.apache.org/jira/browse/BEAM-12618
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Minbo Bae
>            Priority: P2
>
> if a pipeline has a transform with a sideinput and you call `pipeline.run()` two times to create Dataflow jobs, the second job will see a runtime error below in Dataflow worker.
> {code}
> Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as a reader but was AvroByteReader.
>         org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:297)
>         org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
>         org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
>         org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
>         baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
>         org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
> Caused by: java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as a reader but was AvroByteReader.
>         org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>         org.apache.beam.runners.dataflow.worker.IsmSideInputReader.createReadersFromSources(IsmSideInputReader.java:253)
>         org.apache.beam.runners.dataflow.worker.IsmSideInputReader.<init>(IsmSideInputReader.java:165)
>         org.apache.beam.runners.dataflow.worker.IsmSideInputReader.of(IsmSideInputReader.java:278)
>         org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:290)
>         org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
>         org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
>         org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
>         baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
>         org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
>         org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
>         org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
>         org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
>         org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
>         org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
>         org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>         org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>         org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>         java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> You can reproduce this issue with [this sample pipeline|[https://github.com/baeminbo/dataflow-pipelines/tree/master/double-run-sideinput/]].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)