You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2020/11/04 13:16:50 UTC

Re: Beam 2.25.0 / Flink 1.11.2 - Job failing after upgrading from 2.24.0 / Flink 1.10.2

Hi Tobias,

this looks like a bug, the clearGlobalState method has been introduced 
in 2.25.0, and it (seems to) might have issues related to rocksdb, can 
you file a Jira for that, please?

Thanks,

  Jan

On 11/4/20 9:50 AM, Kaymak, Tobias wrote:
> When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 
> Docker image,
> the following exception is visible for the failing job on the *job 
> manager*:
>
> 2020-11-04 09:27:14
> java.lang.RuntimeException: Failed to cleanup global state.
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
>     at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
>     at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
>     at org.apache.flink.streaming.runtime.io 
> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
>     at org.apache.flink.streaming.runtime.io 
> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>     at org.apache.flink.streaming.runtime.io 
> <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be 
> cast to org.apache.flink.runtime.state.VoidNamespace
>     at 
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77)
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291)
>     at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242)
>     at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)
>     ... 17 more
> This is from the *task manager's* logs:
> 2020-11-04 08:46:31,250 WARN 
>  org.apache.flink.runtime.taskmanager.Task                    [] - 
> BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable 
> ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) -> 
> BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) 
> -> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0 
> java.lang.RuntimeException: Failed to cleanup global state.   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150) 
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] 
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791) 
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] 
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741) 
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] 
>   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713) 
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] 
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> [flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> java.lang.Thread.run(Thread.java:748) [?:1.8.0_265] Caused by: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> org.apache.flink.runtime.state.VoidNamespace   at 
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77) 
> ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291) 
> ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242) 
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]   at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141) 
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] 
>   ... 17 more I think it might be a "translation" problem. One last 
> thing I want to try before downgrading to Flink 1.10.2 is using Flink 
> 1.11.1 as an executor to see if this is caused by mismatched minor 
> versions. Best, Tobi

Re: Beam 2.25.0 / Flink 1.11.2 - Job failing after upgrading from 2.24.0 / Flink 1.10.2

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Hi Jan,

thank you for your response, I created a JIRA ticket
https://issues.apache.org/jira/browse/BEAM-11191



On Wed, Nov 4, 2020 at 2:17 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Tobias,
>
> this looks like a bug, the clearGlobalState method has been introduced in
> 2.25.0, and it (seems to) might have issues related to rocksdb, can you
> file a Jira for that, please?
>
> Thanks,
>
>  Jan
> On 11/4/20 9:50 AM, Kaymak, Tobias wrote:
>
> When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 Docker
> image,
> the following exception is visible for the failing job on the *job
> manager*:
>
> 2020-11-04 09:27:14
> java.lang.RuntimeException: Failed to cleanup global state.
>     at org.apache.beam.runners.flink.translation.wrappers.streaming.state.
> FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
>     at org.apache.beam.runners.flink.translation.wrappers.streaming.
> DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
>     at org.apache.beam.runners.flink.translation.wrappers.streaming.
> DoFnOperator.processWatermark1(DoFnOperator.java:741)
>     at org.apache.beam.runners.flink.translation.wrappers.streaming.
> DoFnOperator.processWatermark(DoFnOperator.java:713)
>     at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(
> OneInputStreamTask.java:167)
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:179)
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:180)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:153)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:351)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxStep(MailboxProcessor.java:191)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:181)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:566)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:536)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
> to org.apache.flink.runtime.state.VoidNamespace
>     at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
> VoidNamespaceSerializer.java:32)
>     at org.apache.flink.contrib.streaming.state.
> RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils
> .java:77)
>     at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
> .getKeys(RocksDBKeyedStateBackend.java:291)
>     at org.apache.flink.runtime.state.AbstractKeyedStateBackend
> .applyToAllKeys(AbstractKeyedStateBackend.java:242)
>     at org.apache.beam.runners.flink.translation.wrappers.streaming.state.
> FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)
>     ... 17 more
> This is from the *task manager's* logs:
> 2020-11-04 08:46:31,250 WARN  org.apache.flink.runtime.taskmanager.Task
>                  [] -
> BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable
> ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) ->
> BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> -> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0
>
>
>
>                 java.lang.RuntimeException: Failed to cleanup global state.
>
>
>                                                           at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
>                                               at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
>                                                                 at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
>                                                                 at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
>                                                                   at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>                                                                 at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>                                         at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>       at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>             at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>                             at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>       at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>                           at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>                                   at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
>                                                   at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
>                                                     at
> java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
>
>
>                                     Caused by:
> java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.flink.runtime.state.VoidNamespace
>
>                                             at
> org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>                 at
> org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77)
> ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]
>                                                                     at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291)
> ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2]
>
>       at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>       at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
>                                               ... 17 more
>
>
>         I think it might be a "translation" problem. One last thing I
> want to try before downgrading to Flink 1.10.2 is using Flink 1.11.1 as an
> executor to see if this is caused by mismatched minor versions. Best, Tobi
>
>