You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Minbo Bae (Jira)" <ji...@apache.org> on 2021/07/15 15:02:00 UTC

[jira] [Comment Edited] (BEAM-12618) Check state failed at IsmSideInputReader.createReadersFromSources

    [ https://issues.apache.org/jira/browse/BEAM-12618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381398#comment-17381398 ] 

Minbo Bae edited comment on BEAM-12618 at 7/15/21, 3:01 PM:
------------------------------------------------------------

Comparing the graphs for first run and second run, I saw the first run had `use_indexed_format` as true while the second run had no such field. I guess this  affects the type of Reader for side input.

{code:json}
"output_info": [
          {
            "output_name": {
              "@type": "http://schema.org/Text",
              "value": "org.apache.beam.sdk.values.PCollection.<init>:408#86d5c620b9cc619d"
            },
            "use_indexed_format": {
              "@type": "http://schema.org/Boolean",
              "value": true
            },
            "user_name": {
              "@type": "http://schema.org/Text",
              "value": "Output1/ParDo(ToIsmRecordForGlobalWindow).out0"
            },
            "encoding": {
              "@type": "kind:windowed_value",
              "is_wrapper": {
                "@type": "http://schema.org/Boolean",
                "value": true
              },
              "component_encodings": [
                {
                  "@type": "org.apache.beam.sdk.coders.CustomCoder",
                  "type": {
                    "@type": "http://schema.org/Text",
                    "value": "org.apache.beam.runners.dataflow.internal.IsmFormat$IsmRecordCoder"
                  },
                  "serialized_coder": {
                    "@type": "http://schema.org/Text",
                    "value": "%82SNAPPY%00%00%00%00%01%00%00%00%01%00%00%03(%e5%09%f0y%ac%ed%00%05sr%00Borg.apache.beam.runners.dataflow.internal.IsmFormat$IsmRecordCoder%b8%f0:%25%cbR%bc%db%02%00%04I%00%1enumberOfMetadataShardKeyCodersI%00%16n%0d!%04Sh.%19%00%0cL%00%12k%01%25 mponentCo%01.ht%00%10Ljava/util/List;L%00%0avalue%05! t%00%22Lorg/a%05%c0%00/%01%c0%14/sdk/c%05>%00/%05$%10;xr%00&>%e5%00%0csdk.%09%25%18.Custom%05+8j%b0%08%9d%0b;%1d%0b%02%00%00xr%00 n5%00%01a%1cC%dd%d5%89%ae%bc~%f8%01/%04p%00%09%01%10%01sr%00]>9%00|vendor.guava.v26_0_jre.com.googl%05%0bPmon.collect.Immutable%01%f3($Serialized!%7f%09hL%00%00%02%00%01[%00%08elementst%00%13[)+<lang/Object;xpur%11%19%00.%01%19%00.%0d%19%1c%90%ceX%9f%10s)l%11%bb%10%02sr%00;>%b7%00!%25ltransforms.windowing.GlobalW%05%10%04$C!%0b%1c3|%aa%f4%14%1e%87%c3%01P%08r%00*NJ%00-o StructureI<%1cs%bf%12%0e%d5%d46%11%019 q%00~%00%04sr%00-j?%000BigEndianLong%05{%1c%daI%95%ca%ee%c6%0e:%01Bv%ea%01%14Atomic%055%1c%c7%ec%b5%cc%85tPF%015%01w%10%0csr%00=Nw%00Ay%00.%09%ea(edValue$Ful%0d%fc%0d%12%05R4%89%1c-%ad%b5|%0d%bf%02%00%02L%00%0b),%05%19%01_%0c%02L%00%0c%09%13%00s%19%14%0cxr%009%9as%002%81%00%05B(%8d~v%cdC|%d3%f0%02%00%012%12%03%05n%15%d3%00&N`%00-%89%0cVarImh%1c%c0K2%da%a9KVh1%0e%00%0f%01F%04%0dsv%c9%01%00CI%ac%08ion%05%91%1c%8e%c7%f8%b7%1bi.%b0%01D%08r%00,jy%00%0cIterI%dc%00k%09%cc%1c]%5cn!%f1%8d%eb%0b%25;%00%0cM%d2%05U%01%88!<%00i%0d3%18Namet%00%12]%e9%18String;%09%f2%01/4%0dt%00%0aCollection"
                  }
                },
                {
                  "@type": "kind:global_window"
                }
              ]
            }
          }
        ]
{code}


was (Author: baeminbo):
Comparing the graphs for first run and second run, I saw the first run had `use_indexed_format` as true while the second run had no such field. This  affects the type of Reader for side input.

{code:json}
"output_info": [
          {
            "output_name": {
              "@type": "http://schema.org/Text",
              "value": "org.apache.beam.sdk.values.PCollection.<init>:408#86d5c620b9cc619d"
            },
            "use_indexed_format": {
              "@type": "http://schema.org/Boolean",
              "value": true
            },
            "user_name": {
              "@type": "http://schema.org/Text",
              "value": "Output1/ParDo(ToIsmRecordForGlobalWindow).out0"
            },
            "encoding": {
              "@type": "kind:windowed_value",
              "is_wrapper": {
                "@type": "http://schema.org/Boolean",
                "value": true
              },
              "component_encodings": [
                {
                  "@type": "org.apache.beam.sdk.coders.CustomCoder",
                  "type": {
                    "@type": "http://schema.org/Text",
                    "value": "org.apache.beam.runners.dataflow.internal.IsmFormat$IsmRecordCoder"
                  },
                  "serialized_coder": {
                    "@type": "http://schema.org/Text",
                    "value": "%82SNAPPY%00%00%00%00%01%00%00%00%01%00%00%03(%e5%09%f0y%ac%ed%00%05sr%00Borg.apache.beam.runners.dataflow.internal.IsmFormat$IsmRecordCoder%b8%f0:%25%cbR%bc%db%02%00%04I%00%1enumberOfMetadataShardKeyCodersI%00%16n%0d!%04Sh.%19%00%0cL%00%12k%01%25 mponentCo%01.ht%00%10Ljava/util/List;L%00%0avalue%05! t%00%22Lorg/a%05%c0%00/%01%c0%14/sdk/c%05>%00/%05$%10;xr%00&>%e5%00%0csdk.%09%25%18.Custom%05+8j%b0%08%9d%0b;%1d%0b%02%00%00xr%00 n5%00%01a%1cC%dd%d5%89%ae%bc~%f8%01/%04p%00%09%01%10%01sr%00]>9%00|vendor.guava.v26_0_jre.com.googl%05%0bPmon.collect.Immutable%01%f3($Serialized!%7f%09hL%00%00%02%00%01[%00%08elementst%00%13[)+<lang/Object;xpur%11%19%00.%01%19%00.%0d%19%1c%90%ceX%9f%10s)l%11%bb%10%02sr%00;>%b7%00!%25ltransforms.windowing.GlobalW%05%10%04$C!%0b%1c3|%aa%f4%14%1e%87%c3%01P%08r%00*NJ%00-o StructureI<%1cs%bf%12%0e%d5%d46%11%019 q%00~%00%04sr%00-j?%000BigEndianLong%05{%1c%daI%95%ca%ee%c6%0e:%01Bv%ea%01%14Atomic%055%1c%c7%ec%b5%cc%85tPF%015%01w%10%0csr%00=Nw%00Ay%00.%09%ea(edValue$Ful%0d%fc%0d%12%05R4%89%1c-%ad%b5|%0d%bf%02%00%02L%00%0b),%05%19%01_%0c%02L%00%0c%09%13%00s%19%14%0cxr%009%9as%002%81%00%05B(%8d~v%cdC|%d3%f0%02%00%012%12%03%05n%15%d3%00&N`%00-%89%0cVarImh%1c%c0K2%da%a9KVh1%0e%00%0f%01F%04%0dsv%c9%01%00CI%ac%08ion%05%91%1c%8e%c7%f8%b7%1bi.%b0%01D%08r%00,jy%00%0cIterI%dc%00k%09%cc%1c]%5cn!%f1%8d%eb%0b%25;%00%0cM%d2%05U%01%88!<%00i%0d3%18Namet%00%12]%e9%18String;%09%f2%01/4%0dt%00%0aCollection"
                  }
                },
                {
                  "@type": "kind:global_window"
                }
              ]
            }
          }
        ]
{code}

> Check state failed at IsmSideInputReader.createReadersFromSources
> -----------------------------------------------------------------
>
>                 Key: BEAM-12618
>                 URL: https://issues.apache.org/jira/browse/BEAM-12618
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Minbo Bae
>            Priority: P2
>
> if a pipeline has a transform with a sideinput and you call `pipeline.run()` two times to create Dataflow jobs, the second job will see a runtime error below in Dataflow worker.
> {code:java}
> Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as a reader but was AvroByteReader.
>         org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:297)
>         org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
>         org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
>         org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
>         baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
>         org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
> Caused by: java.lang.IllegalStateException: IsmSideInputReader only supports IsmReader as a reader but was AvroByteReader.
>         org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>         org.apache.beam.runners.dataflow.worker.IsmSideInputReader.createReadersFromSources(IsmSideInputReader.java:253)
>         org.apache.beam.runners.dataflow.worker.IsmSideInputReader.<init>(IsmSideInputReader.java:165)
>         org.apache.beam.runners.dataflow.worker.IsmSideInputReader.of(IsmSideInputReader.java:278)
>         org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.lambda$getSideInputReader$0(BatchModeExecutionContext.java:290)
>         org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:167)
>         org.apache.beam.runners.dataflow.worker.LazilyInitializedSideInputReader.get(LazilyInitializedSideInputReader.java:50)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:267)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1200(SimpleDoFnRunner.java:84)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:395)
>         org.apache.beam.sdk.transforms.Contextful$Fn$Context$ContextFromProcessContext.sideInput(Contextful.java:100)
>         baeminbo.DoubleRunSideinputPipeline.lambda$main$38d062d6$1(DoubleRunSideinputPipeline.java:34)
>         org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:143)
>         org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>         org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
>         org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
>         org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
>         org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
>         org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
>         org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
>         org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>         org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>         org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>         java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         java.base/java.lang.Thread.run(Thread.java:834)
> {code}
> You can reproduce this issue with [this sample pipeline|https://github.com/baeminbo/dataflow-pipelines/tree/master/double-run-sideinput/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)