You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Hill <qu...@gmail.com> on 2021/08/20 15:20:09 UTC

Re: savepoint failure

I think this was from a breaking change we made to the key calculation in
our code between version updates.  So this error makes sense.

What's the best way to get more info for debugging?  How can I configure
the logs to output more key information?

On Fri, Jul 16, 2021 at 11:29 PM Dan Hill <qu...@gmail.com> wrote:

> Thanks, Till!
>
> On Thu, Jul 15, 2021 at 12:52 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Dan,
>>
>> From the logs I couldn't find anything suspicious. The job runs until you
>> try to draw a savepoint. When doing this Flink fails with "Key group 0 is
>> not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}". W/o having access
>> to your job or a minimal example that allows to reproduce this problem, it
>> will be super hard to figure out what's going wrong. My best guess would
>> still be that we have a non deterministic key somewhere.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 15, 2021 at 7:26 AM Dan Hill <qu...@gmail.com> wrote:
>>
>>> I don't know if it matters but I'm using unaligned checkpoints.
>>>
>>> On Wed, Jul 14, 2021 at 8:33 PM Dan Hill <qu...@gmail.com> wrote:
>>>
>>>> Here's the overview flow chart.
>>>>
>>>> [image: Screen Shot 2021-07-14 at 8.24.33 PM.png]
>>>>
>>>>
>>>>
>>>> On Wed, Jul 14, 2021 at 7:10 PM Dan Hill <qu...@gmail.com> wrote:
>>>>
>>>>> *-others*
>>>>>
>>>>> *Code*
>>>>> I'm not sure of a good, secure way of sharing the java code.  It
>>>>> depends on multiple internal repos.  The savepoint appears to be failing in
>>>>> a custom KeyedCoProcessFunction that joins two keyed streams in a fuzzy
>>>>> way.  The streams are joined based on a Tuple2<String, Long> and has some
>>>>> internal map state using String keys.
>>>>>
>>>>> *Flink config*
>>>>> The most relevant parts of the flink config are the following:
>>>>> state.backend.async: true
>>>>> state.backend.incremental: true
>>>>> state.backend.local-recovery: false
>>>>> taskmanager.state.local.root-dirs: /flink_state/local-recovery
>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num: 1
>>>>> state.backend.rocksdb.localdir: /flink_state/rocksdb
>>>>> state.backend.rocksdb.options-factory:
>>>>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>>>>> state.backend.rocksdb.predefined-options: DEFAULT
>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>> state.backend.rocksdb.ttl.compaction.filter.enabled: false
>>>>> state.checkpoints.dir: s3a://my-flink-state/checkpoints
>>>>> state.savepoints.dir: s3a://my-metrics-flink-state/savepoints
>>>>>
>>>>> *Workflow*
>>>>> What do you mean by workflow?
>>>>>
>>>>> *Logs*
>>>>> Here's the job manager log.  The task manager log did not look useful.
>>>>>
>>>>> https://drive.google.com/file/d/1jC5-3Bm2OP0dX1GJACwHGeqxd4snFc-W/view?usp=sharing
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jul 14, 2021 at 12:45 AM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Dan,
>>>>>>
>>>>>> Can you provide us with more information about your job (maybe even
>>>>>> the job code or a minimally working example), the Flink configuration, the
>>>>>> exact workflow you are doing and the corresponding logs and error messages?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <qu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Could this be caused by mixing of configuration settings when
>>>>>>> running?  Running a job with one parallelism, stop/savepointing and then
>>>>>>> recovering with a different parallelism?  I'd assume that's fine and
>>>>>>> wouldn't put create bad state.
>>>>>>>
>>>>>>> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <qu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I checked my code.  Our keys for streams and map state only use
>>>>>>>> either (1) string, (2) long IDs that don't change or (3) Tuple of 1 and 2.
>>>>>>>>
>>>>>>>> I don't know why my current case is breaking.  Our job partitions
>>>>>>>> and parallelism settings have not changed.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <qu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey.  I just hit a similar error in production when trying to
>>>>>>>>> savepoint.  We also use protobufs.
>>>>>>>>>
>>>>>>>>> Has anyone found a better fix to this?
>>>>>>>>>
>>>>>>>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <
>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Glad to hear that you solved your problem. Afaik Flink should not
>>>>>>>>>> read the fields of messages and call hashCode on them.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Till,
>>>>>>>>>>>
>>>>>>>>>>> I found my problem. It was indeed related to a mutable hashcode.
>>>>>>>>>>>
>>>>>>>>>>> I was using a protobuf message in the key selector function and
>>>>>>>>>>> one of the protobuf fields was enum. I checked the implementation of the
>>>>>>>>>>> hashcode of the generated message and it is using the int value field of
>>>>>>>>>>> the protobuf message so I assumed that it is ok and it's immutable.
>>>>>>>>>>>
>>>>>>>>>>> I replaced the key selector function to use Tuple[Long, Int]
>>>>>>>>>>> (since my protobuf message has only these two fields where the int
>>>>>>>>>>> parameter stands for the enum value field). After changing my code to use
>>>>>>>>>>> the Tuple it worked.
>>>>>>>>>>>
>>>>>>>>>>> I am not sure if Flink somehow reads the protobuf message fields
>>>>>>>>>>> and uses the hashcode of the fields directly since the generated protobuf
>>>>>>>>>>> enum indeed has a mutable hashcode (Enum.hashcode).
>>>>>>>>>>>
>>>>>>>>>>> Nevertheless it's ok with the Tuple key.
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your response!
>>>>>>>>>>>
>>>>>>>>>>> Best Regards,
>>>>>>>>>>> Rado
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <
>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Rado,
>>>>>>>>>>>>
>>>>>>>>>>>> it is hard to tell the reason w/o a bit more details. Could you
>>>>>>>>>>>> share with us the complete logs of the problematic run? Also the job you
>>>>>>>>>>>> are running and the types of the state you are storing in RocksDB and use
>>>>>>>>>>>> as events in your job are very important. In the linked SO question, the
>>>>>>>>>>>> problem was a type whose hashcode was not immutable.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Till
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am running a Flink job that performs data enrichment. My job
>>>>>>>>>>>>> has 7 kafka consumers that receive messages for dml statements performed
>>>>>>>>>>>>> for 7 db tables.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Job setup:
>>>>>>>>>>>>>
>>>>>>>>>>>>>    - Flink is run in k8s in a similar way as it is described
>>>>>>>>>>>>>    here
>>>>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>>>>>>>    .
>>>>>>>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>>>>>>>    - rocksdb as state backend
>>>>>>>>>>>>>    - protobuf for serialization
>>>>>>>>>>>>>
>>>>>>>>>>>>> Whenever I try to trigger a savepoint after my state is
>>>>>>>>>>>>> bootstrapped I get the following error for different operators:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Key group 0 is
>>>>>>>>>>>>> not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Note: key group might vary.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I found this
>>>>>>>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article
>>>>>>>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>>>>>>>>>>>> similar to the one described in the article except that my job has more
>>>>>>>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I tried to reduce the parallelism to 1 with 1 task slot per
>>>>>>>>>>>>> task manager and this configuration seems to work. This leads me to a
>>>>>>>>>>>>> direction that it might be some concurrency issue.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would like to understand what is causing the savepoint
>>>>>>>>>>>>> failure. Do you have any suggestions what I might be missing?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>
>>>>>>>>>>>>

Re: savepoint failure

Posted by Till Rohrmann <tr...@apache.org>.
Great to hear that you figured the problem out Dan and sorry for not
getting back to you sooner. We are in the midst of the release testing and
things are always getting a bit hectic during this time.

Cheers,
Till

On Tue, Aug 31, 2021 at 5:27 AM Dan Hill <qu...@gmail.com> wrote:

> We figured it out.  We had a KeyedCoProcessOperator that had some sanity
> checking using a stored watermark value.  We added it when debugging a
> different issue.
>
> This was tricky to debug because:
> 1. The issue is not hit when the partition=1.  I had to bump the number of
> partitions up to reproduce locally.
> 2. I had to wait until after a checkpoint was hit for the error to occur.
>
> Rough code:
>
>     private ValueState<Long> currentWatermark;
>
>
>     @Override
>
>     public void processElement1(StreamRecord<FlatEvent> in) throws
> Exception {
>
>       ...
>
>       if (currentWatermark.value() == null || rowTime>
> currentWatermark.value()) {
>
>         in.setTimestamp(rowTime);
>
>       } else {
>
>         LOGGER.error("...")
>
>       }
>
>       super.processElement1(in);
>
>     }
>
>
>     @Override
>
>     public void processWatermark(Watermark mark) throws Exception {
>
>         currentWatermark.update(mark.getTimestamp());
>
>         super.processWatermark(mark);
>
>     }
>
> On Mon, Aug 23, 2021 at 2:28 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Dan,
>>
>> from the snippets I suspect the following is happening: A TaskManager
>> dies that triggers the failover. The failover will stop the
>> CheckpointCoordinator that will abort all pending checkpoints. Since the
>> `CheckpointFailureManager` seems to be configured to treat these kinds of
>> failures as job failures, it tries to restart the job. See [1] for more
>> information. Given that the job is currently being restarted, this
>> shouldn't do anything. Hence, the job should eventually recover and
>> continue running smoothly, hopefully. For more details, I would need to see
>> the full logs.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int-
>>
>> Cheers,
>> Till
>>
>> On Sun, Aug 22, 2021 at 6:24 AM Dan Hill <qu...@gmail.com> wrote:
>>
>>> I audited my code.  All of the directly coded keys are either Strings,
>>> Longs or Tuple2<String, Long>.
>>>
>>> Does the FileSink use any of the incoming records as keys?  We use
>>> ProtobufDatumWriter to write Protos to Avro files.  Could this have an
>>> issue in it?
>>>
>>> Does any part of Flink's setup try to create a key from a record
>>> automatically?
>>>
>>> private static <T extends GeneratedMessageV3> AvroWriterFactory<T>
>>> getAvroWriterFactory(Class<T> avroProtoClass) {
>>>
>>>     return new AvroWriterFactory<T>((AvroBuilder<T>) out -> {
>>>
>>>         Schema schema = ProtobufData.get().getSchema(avroProtoClass);
>>>
>>>         ProtobufDatumWriter<T> pbWriter = new
>>> ProtobufDatumWriter<>(schema);
>>>
>>>         DataFileWriter<T> dataFileWriter = new
>>> DataFileWriter<>(pbWriter);
>>>
>>>         dataFileWriter.setCodec(CodecFactory.snappyCodec());
>>>
>>>         dataFileWriter.create(schema, out);
>>>
>>>         return dataFileWriter;
>>>
>>>     });
>>>
>>> }
>>>
>>>
>>> On Sat, Aug 21, 2021 at 8:52 PM Dan Hill <qu...@gmail.com> wrote:
>>>
>>>> Do people write checkpoint/savepoint recovery tests?  E.g. persist a
>>>> checkpoint from a run and verify that it can be recovered?
>>>>
>>>> Also, I don't always hit this error when task managers restart.  I've
>>>> had plenty of task managers die while running and savepoints usually work.
>>>>
>>>>
>>>> On Sat, Aug 21, 2021 at 8:49 PM Dan Hill <qu...@gmail.com> wrote:
>>>>
>>>>> Darn, nevermind.  I was backfilling a job from the start of our time
>>>>> period (not from any checkpoint/savepoint) and, when I tried to savepoint,
>>>>> the job failed with "Checkpoint Coordinator is suspending.".  Then
>>>>> when I tried to savepoint again, I hit the key group error.
>>>>>
>>>>> 2021-08-22 03:35:42,305 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       [] - Join UserViewRequestInsertionImpression -> (Sink Writer:
>>>>> S3 flat-user-impression -> Sink Committer: S3 flat-user-impression, Sink
>>>>> Writer: S3 flat-user-impression fixed -> Sink Committer: S3
>>>>> flat-user-impression fixed) (32/56) (167f59052298fc5d8e9c318958f3cfc4)
>>>>> switched from RUNNING to FAILED on 10.12.101.133:6122-164771 @
>>>>> flink-taskmanager-0.flink-taskmanager.default.svc.cluster.local
>>>>> (dataPort=42979).
>>>>>
>>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>>>> Connection unexpectedly closed by remote task manager '
>>>>> 10.12.100.55/10.12.100.55:43687'. This might indicate that the remote
>>>>> task manager was lost.
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>>>>>
>>>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink
>>>>> Writer: S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink
>>>>> Writer: S3 flat-user-action fixed -> Sink Committer: S3 flat-user-action
>>>>> fixed, Map -> Sink: Kafka flat-user-action-json) (15/56)
>>>>> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>>>>>
>>>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       [] - Discarding the results produced by task execution
>>>>> 3077ef9c918af545a1165d0715b5e6c6.
>>>>>
>>>>> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>>>>>                 [] - Trying to recover from a global failure.
>>>>>
>>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
>>>>> Coordinator is suspending.
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown
>>>>> Source) ~[?:?]
>>>>>
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> ~[?:1.8.0_292]
>>>>>
>>>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> ~[?:1.8.0_292]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink
>>>>> Writer: S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink
>>>>> Writer: S3 flat-user-action fixed -> Sink Committer: S3 flat-user-action
>>>>> fixed, Map -> Sink: Kafka flat-user-action-json) (15/56)
>>>>> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>>>>>
>>>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>       [] - Discarding the results produced by task execution
>>>>> 3077ef9c918af545a1165d0715b5e6c6.
>>>>>
>>>>> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>>>>>                 [] - Trying to recover from a global failure.
>>>>>
>>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
>>>>> Coordinator is suspending.
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown
>>>>> Source) ~[?:?]
>>>>>
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> ~[?:1.8.0_292]
>>>>>
>>>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> ~[?:1.8.0_292]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>         at
>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>>
>>>>>
>>>>> On Fri, Aug 20, 2021 at 8:43 AM Dan Hill <qu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks, Till!
>>>>>>
>>>>>> On Fri, Aug 20, 2021 at 8:31 AM Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Dan, good to hear that you found the problem. What I would
>>>>>>> recommend is to set the log level in the log4j.properties file to DEBUG or
>>>>>>> TRACE (but this is quite noisy). If then the log does not contain the
>>>>>>> required information then it is likely that we don't log it and, hence,
>>>>>>> would have to be added.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Fri, Aug 20, 2021 at 5:20 PM Dan Hill <qu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think this was from a breaking change we made to the key
>>>>>>>> calculation in our code between version updates.  So this error makes sense.
>>>>>>>>
>>>>>>>> What's the best way to get more info for debugging?  How can I
>>>>>>>> configure the logs to output more key information?
>>>>>>>>
>>>>>>>> On Fri, Jul 16, 2021 at 11:29 PM Dan Hill <qu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Till!
>>>>>>>>>
>>>>>>>>> On Thu, Jul 15, 2021 at 12:52 AM Till Rohrmann <
>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Dan,
>>>>>>>>>>
>>>>>>>>>> From the logs I couldn't find anything suspicious. The job runs
>>>>>>>>>> until you try to draw a savepoint. When doing this Flink fails with "Key
>>>>>>>>>> group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}". W/o
>>>>>>>>>> having access to your job or a minimal example that allows to reproduce
>>>>>>>>>> this problem, it will be super hard to figure out what's going wrong. My
>>>>>>>>>> best guess would still be that we have a non deterministic key somewhere.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 15, 2021 at 7:26 AM Dan Hill <qu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I don't know if it matters but I'm using unaligned checkpoints.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 14, 2021 at 8:33 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Here's the overview flow chart.
>>>>>>>>>>>>
>>>>>>>>>>>> [image: Screen Shot 2021-07-14 at 8.24.33 PM.png]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 14, 2021 at 7:10 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> *-others*
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Code*
>>>>>>>>>>>>> I'm not sure of a good, secure way of sharing the java code.
>>>>>>>>>>>>> It depends on multiple internal repos.  The savepoint appears to be failing
>>>>>>>>>>>>> in a custom KeyedCoProcessFunction that joins two keyed streams in a fuzzy
>>>>>>>>>>>>> way.  The streams are joined based on a Tuple2<String, Long> and has some
>>>>>>>>>>>>> internal map state using String keys.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Flink config*
>>>>>>>>>>>>> The most relevant parts of the flink config are the following:
>>>>>>>>>>>>> state.backend.async: true
>>>>>>>>>>>>> state.backend.incremental: true
>>>>>>>>>>>>> state.backend.local-recovery: false
>>>>>>>>>>>>> taskmanager.state.local.root-dirs: /flink_state/local-recovery
>>>>>>>>>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num: 1
>>>>>>>>>>>>> state.backend.rocksdb.localdir: /flink_state/rocksdb
>>>>>>>>>>>>> state.backend.rocksdb.options-factory:
>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>>>>>>>>>>>>> state.backend.rocksdb.predefined-options: DEFAULT
>>>>>>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>>>>>>>>>> state.backend.rocksdb.ttl.compaction.filter.enabled: false
>>>>>>>>>>>>> state.checkpoints.dir: s3a://my-flink-state/checkpoints
>>>>>>>>>>>>> state.savepoints.dir: s3a://my-metrics-flink-state/savepoints
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Workflow*
>>>>>>>>>>>>> What do you mean by workflow?
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Logs*
>>>>>>>>>>>>> Here's the job manager log.  The task manager log did not look
>>>>>>>>>>>>> useful.
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://drive.google.com/file/d/1jC5-3Bm2OP0dX1GJACwHGeqxd4snFc-W/view?usp=sharing
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jul 14, 2021 at 12:45 AM Till Rohrmann <
>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can you provide us with more information about your job
>>>>>>>>>>>>>> (maybe even the job code or a minimally working example), the Flink
>>>>>>>>>>>>>> configuration, the exact workflow you are doing and the corresponding logs
>>>>>>>>>>>>>> and error messages?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <
>>>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Could this be caused by mixing of configuration settings
>>>>>>>>>>>>>>> when running?  Running a job with one parallelism, stop/savepointing and
>>>>>>>>>>>>>>> then recovering with a different parallelism?  I'd assume that's fine and
>>>>>>>>>>>>>>> wouldn't put create bad state.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <
>>>>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I checked my code.  Our keys for streams and map state only
>>>>>>>>>>>>>>>> use either (1) string, (2) long IDs that don't change or (3) Tuple of 1 and
>>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I don't know why my current case is breaking.  Our job
>>>>>>>>>>>>>>>> partitions and parallelism settings have not changed.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <
>>>>>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hey.  I just hit a similar error in production when trying
>>>>>>>>>>>>>>>>> to savepoint.  We also use protobufs.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Has anyone found a better fix to this?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <
>>>>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Glad to hear that you solved your problem. Afaik Flink
>>>>>>>>>>>>>>>>>> should not read the fields of messages and call hashCode on them.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I found my problem. It was indeed related to a mutable
>>>>>>>>>>>>>>>>>>> hashcode.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I was using a protobuf message in the key selector
>>>>>>>>>>>>>>>>>>> function and one of the protobuf fields was enum. I checked the
>>>>>>>>>>>>>>>>>>> implementation of the hashcode of the generated message and it is using the
>>>>>>>>>>>>>>>>>>> int value field of the protobuf message so I assumed that it is ok and it's
>>>>>>>>>>>>>>>>>>> immutable.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I replaced the key selector function to use Tuple[Long,
>>>>>>>>>>>>>>>>>>> Int] (since my protobuf message has only these two fields where the int
>>>>>>>>>>>>>>>>>>> parameter stands for the enum value field). After changing my code to use
>>>>>>>>>>>>>>>>>>> the Tuple it worked.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am not sure if Flink somehow reads the protobuf
>>>>>>>>>>>>>>>>>>> message fields and uses the hashcode of the fields directly since the
>>>>>>>>>>>>>>>>>>> generated protobuf enum indeed has a mutable hashcode (Enum.hashcode).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Nevertheless it's ok with the Tuple key.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for your response!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <
>>>>>>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Rado,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> it is hard to tell the reason w/o a bit more details.
>>>>>>>>>>>>>>>>>>>> Could you share with us the complete logs of the problematic run? Also the
>>>>>>>>>>>>>>>>>>>> job you are running and the types of the state you are storing in RocksDB
>>>>>>>>>>>>>>>>>>>> and use as events in your job are very important. In the linked SO
>>>>>>>>>>>>>>>>>>>> question, the problem was a type whose hashcode was not immutable.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I am running a Flink job that performs data
>>>>>>>>>>>>>>>>>>>>> enrichment. My job has 7 kafka consumers that receive messages for dml
>>>>>>>>>>>>>>>>>>>>> statements performed for 7 db tables.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Job setup:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>    - Flink is run in k8s in a similar way as it is
>>>>>>>>>>>>>>>>>>>>>    described here
>>>>>>>>>>>>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>>>>>>>>>>>>>>>    .
>>>>>>>>>>>>>>>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>>>>>>>>>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>>>>>>>>>>>>>>>    - rocksdb as state backend
>>>>>>>>>>>>>>>>>>>>>    - protobuf for serialization
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Whenever I try to trigger a savepoint after my state
>>>>>>>>>>>>>>>>>>>>> is bootstrapped I get the following error for different operators:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Key
>>>>>>>>>>>>>>>>>>>>> group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Note: key group might vary.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>>>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article
>>>>>>>>>>>>>>>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>>>>>>>>>>>>>>>>>>>> similar to the one described in the article except that my job has more
>>>>>>>>>>>>>>>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I tried to reduce the parallelism to 1 with 1 task
>>>>>>>>>>>>>>>>>>>>> slot per task manager and this configuration seems to work. This leads me
>>>>>>>>>>>>>>>>>>>>> to a direction that it might be some concurrency issue.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I would like to understand what is causing the
>>>>>>>>>>>>>>>>>>>>> savepoint failure. Do you have any suggestions what I might be missing?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>

Re: savepoint failure

Posted by Dan Hill <qu...@gmail.com>.
We figured it out.  We had a KeyedCoProcessOperator that had some sanity
checking using a stored watermark value.  We added it when debugging a
different issue.

This was tricky to debug because:
1. The issue is not hit when the partition=1.  I had to bump the number of
partitions up to reproduce locally.
2. I had to wait until after a checkpoint was hit for the error to occur.

Rough code:

    private ValueState<Long> currentWatermark;


    @Override

    public void processElement1(StreamRecord<FlatEvent> in) throws
Exception {

      ...

      if (currentWatermark.value() == null || rowTime>
currentWatermark.value()) {

        in.setTimestamp(rowTime);

      } else {

        LOGGER.error("...")

      }

      super.processElement1(in);

    }


    @Override

    public void processWatermark(Watermark mark) throws Exception {

        currentWatermark.update(mark.getTimestamp());

        super.processWatermark(mark);

    }

On Mon, Aug 23, 2021 at 2:28 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Dan,
>
> from the snippets I suspect the following is happening: A TaskManager dies
> that triggers the failover. The failover will stop the
> CheckpointCoordinator that will abort all pending checkpoints. Since the
> `CheckpointFailureManager` seems to be configured to treat these kinds of
> failures as job failures, it tries to restart the job. See [1] for more
> information. Given that the job is currently being restarted, this
> shouldn't do anything. Hence, the job should eventually recover and
> continue running smoothly, hopefully. For more details, I would need to see
> the full logs.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int-
>
> Cheers,
> Till
>
> On Sun, Aug 22, 2021 at 6:24 AM Dan Hill <qu...@gmail.com> wrote:
>
>> I audited my code.  All of the directly coded keys are either Strings,
>> Longs or Tuple2<String, Long>.
>>
>> Does the FileSink use any of the incoming records as keys?  We use
>> ProtobufDatumWriter to write Protos to Avro files.  Could this have an
>> issue in it?
>>
>> Does any part of Flink's setup try to create a key from a record
>> automatically?
>>
>> private static <T extends GeneratedMessageV3> AvroWriterFactory<T>
>> getAvroWriterFactory(Class<T> avroProtoClass) {
>>
>>     return new AvroWriterFactory<T>((AvroBuilder<T>) out -> {
>>
>>         Schema schema = ProtobufData.get().getSchema(avroProtoClass);
>>
>>         ProtobufDatumWriter<T> pbWriter = new
>> ProtobufDatumWriter<>(schema);
>>
>>         DataFileWriter<T> dataFileWriter = new
>> DataFileWriter<>(pbWriter);
>>
>>         dataFileWriter.setCodec(CodecFactory.snappyCodec());
>>
>>         dataFileWriter.create(schema, out);
>>
>>         return dataFileWriter;
>>
>>     });
>>
>> }
>>
>>
>> On Sat, Aug 21, 2021 at 8:52 PM Dan Hill <qu...@gmail.com> wrote:
>>
>>> Do people write checkpoint/savepoint recovery tests?  E.g. persist a
>>> checkpoint from a run and verify that it can be recovered?
>>>
>>> Also, I don't always hit this error when task managers restart.  I've
>>> had plenty of task managers die while running and savepoints usually work.
>>>
>>>
>>> On Sat, Aug 21, 2021 at 8:49 PM Dan Hill <qu...@gmail.com> wrote:
>>>
>>>> Darn, nevermind.  I was backfilling a job from the start of our time
>>>> period (not from any checkpoint/savepoint) and, when I tried to savepoint,
>>>> the job failed with "Checkpoint Coordinator is suspending.".  Then
>>>> when I tried to savepoint again, I hit the key group error.
>>>>
>>>> 2021-08-22 03:35:42,305 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>       [] - Join UserViewRequestInsertionImpression -> (Sink Writer: S3
>>>> flat-user-impression -> Sink Committer: S3 flat-user-impression, Sink
>>>> Writer: S3 flat-user-impression fixed -> Sink Committer: S3
>>>> flat-user-impression fixed) (32/56) (167f59052298fc5d8e9c318958f3cfc4)
>>>> switched from RUNNING to FAILED on 10.12.101.133:6122-164771 @
>>>> flink-taskmanager-0.flink-taskmanager.default.svc.cluster.local
>>>> (dataPort=42979).
>>>>
>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>>> Connection unexpectedly closed by remote task manager '
>>>> 10.12.100.55/10.12.100.55:43687'. This might indicate that the remote
>>>> task manager was lost.
>>>>
>>>>         at
>>>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>>>>
>>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink
>>>> Writer: S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink
>>>> Writer: S3 flat-user-action fixed -> Sink Committer: S3 flat-user-action
>>>> fixed, Map -> Sink: Kafka flat-user-action-json) (15/56)
>>>> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>>>>
>>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>       [] - Discarding the results produced by task execution
>>>> 3077ef9c918af545a1165d0715b5e6c6.
>>>>
>>>> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>>>>                 [] - Trying to recover from a global failure.
>>>>
>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
>>>> Coordinator is suspending.
>>>>
>>>>         at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown
>>>> Source) ~[?:?]
>>>>
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> ~[?:1.8.0_292]
>>>>
>>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>> ~[?:1.8.0_292]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink
>>>> Writer: S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink
>>>> Writer: S3 flat-user-action fixed -> Sink Committer: S3 flat-user-action
>>>> fixed, Map -> Sink: Kafka flat-user-action-json) (15/56)
>>>> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>>>>
>>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>       [] - Discarding the results produced by task execution
>>>> 3077ef9c918af545a1165d0715b5e6c6.
>>>>
>>>> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>>>>                 [] - Trying to recover from a global failure.
>>>>
>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
>>>> Coordinator is suspending.
>>>>
>>>>         at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown
>>>> Source) ~[?:?]
>>>>
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> ~[?:1.8.0_292]
>>>>
>>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>> ~[?:1.8.0_292]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>         at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>>
>>>>
>>>> On Fri, Aug 20, 2021 at 8:43 AM Dan Hill <qu...@gmail.com> wrote:
>>>>
>>>>> Thanks, Till!
>>>>>
>>>>> On Fri, Aug 20, 2021 at 8:31 AM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Dan, good to hear that you found the problem. What I would
>>>>>> recommend is to set the log level in the log4j.properties file to DEBUG or
>>>>>> TRACE (but this is quite noisy). If then the log does not contain the
>>>>>> required information then it is likely that we don't log it and, hence,
>>>>>> would have to be added.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Fri, Aug 20, 2021 at 5:20 PM Dan Hill <qu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I think this was from a breaking change we made to the key
>>>>>>> calculation in our code between version updates.  So this error makes sense.
>>>>>>>
>>>>>>> What's the best way to get more info for debugging?  How can I
>>>>>>> configure the logs to output more key information?
>>>>>>>
>>>>>>> On Fri, Jul 16, 2021 at 11:29 PM Dan Hill <qu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks, Till!
>>>>>>>>
>>>>>>>> On Thu, Jul 15, 2021 at 12:52 AM Till Rohrmann <
>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Dan,
>>>>>>>>>
>>>>>>>>> From the logs I couldn't find anything suspicious. The job runs
>>>>>>>>> until you try to draw a savepoint. When doing this Flink fails with "Key
>>>>>>>>> group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}". W/o
>>>>>>>>> having access to your job or a minimal example that allows to reproduce
>>>>>>>>> this problem, it will be super hard to figure out what's going wrong. My
>>>>>>>>> best guess would still be that we have a non deterministic key somewhere.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Thu, Jul 15, 2021 at 7:26 AM Dan Hill <qu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I don't know if it matters but I'm using unaligned checkpoints.
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 14, 2021 at 8:33 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Here's the overview flow chart.
>>>>>>>>>>>
>>>>>>>>>>> [image: Screen Shot 2021-07-14 at 8.24.33 PM.png]
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 14, 2021 at 7:10 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> *-others*
>>>>>>>>>>>>
>>>>>>>>>>>> *Code*
>>>>>>>>>>>> I'm not sure of a good, secure way of sharing the java code.
>>>>>>>>>>>> It depends on multiple internal repos.  The savepoint appears to be failing
>>>>>>>>>>>> in a custom KeyedCoProcessFunction that joins two keyed streams in a fuzzy
>>>>>>>>>>>> way.  The streams are joined based on a Tuple2<String, Long> and has some
>>>>>>>>>>>> internal map state using String keys.
>>>>>>>>>>>>
>>>>>>>>>>>> *Flink config*
>>>>>>>>>>>> The most relevant parts of the flink config are the following:
>>>>>>>>>>>> state.backend.async: true
>>>>>>>>>>>> state.backend.incremental: true
>>>>>>>>>>>> state.backend.local-recovery: false
>>>>>>>>>>>> taskmanager.state.local.root-dirs: /flink_state/local-recovery
>>>>>>>>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num: 1
>>>>>>>>>>>> state.backend.rocksdb.localdir: /flink_state/rocksdb
>>>>>>>>>>>> state.backend.rocksdb.options-factory:
>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>>>>>>>>>>>> state.backend.rocksdb.predefined-options: DEFAULT
>>>>>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>>>>>>>>> state.backend.rocksdb.ttl.compaction.filter.enabled: false
>>>>>>>>>>>> state.checkpoints.dir: s3a://my-flink-state/checkpoints
>>>>>>>>>>>> state.savepoints.dir: s3a://my-metrics-flink-state/savepoints
>>>>>>>>>>>>
>>>>>>>>>>>> *Workflow*
>>>>>>>>>>>> What do you mean by workflow?
>>>>>>>>>>>>
>>>>>>>>>>>> *Logs*
>>>>>>>>>>>> Here's the job manager log.  The task manager log did not look
>>>>>>>>>>>> useful.
>>>>>>>>>>>>
>>>>>>>>>>>> https://drive.google.com/file/d/1jC5-3Bm2OP0dX1GJACwHGeqxd4snFc-W/view?usp=sharing
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 14, 2021 at 12:45 AM Till Rohrmann <
>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Can you provide us with more information about your job (maybe
>>>>>>>>>>>>> even the job code or a minimally working example), the Flink configuration,
>>>>>>>>>>>>> the exact workflow you are doing and the corresponding logs and error
>>>>>>>>>>>>> messages?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Till
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <
>>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Could this be caused by mixing of configuration settings when
>>>>>>>>>>>>>> running?  Running a job with one parallelism, stop/savepointing and then
>>>>>>>>>>>>>> recovering with a different parallelism?  I'd assume that's fine and
>>>>>>>>>>>>>> wouldn't put create bad state.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <
>>>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I checked my code.  Our keys for streams and map state only
>>>>>>>>>>>>>>> use either (1) string, (2) long IDs that don't change or (3) Tuple of 1 and
>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I don't know why my current case is breaking.  Our job
>>>>>>>>>>>>>>> partitions and parallelism settings have not changed.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <
>>>>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hey.  I just hit a similar error in production when trying
>>>>>>>>>>>>>>>> to savepoint.  We also use protobufs.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Has anyone found a better fix to this?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <
>>>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Glad to hear that you solved your problem. Afaik Flink
>>>>>>>>>>>>>>>>> should not read the fields of messages and call hashCode on them.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I found my problem. It was indeed related to a mutable
>>>>>>>>>>>>>>>>>> hashcode.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I was using a protobuf message in the key selector
>>>>>>>>>>>>>>>>>> function and one of the protobuf fields was enum. I checked the
>>>>>>>>>>>>>>>>>> implementation of the hashcode of the generated message and it is using the
>>>>>>>>>>>>>>>>>> int value field of the protobuf message so I assumed that it is ok and it's
>>>>>>>>>>>>>>>>>> immutable.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I replaced the key selector function to use Tuple[Long,
>>>>>>>>>>>>>>>>>> Int] (since my protobuf message has only these two fields where the int
>>>>>>>>>>>>>>>>>> parameter stands for the enum value field). After changing my code to use
>>>>>>>>>>>>>>>>>> the Tuple it worked.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am not sure if Flink somehow reads the protobuf message
>>>>>>>>>>>>>>>>>> fields and uses the hashcode of the fields directly since the generated
>>>>>>>>>>>>>>>>>> protobuf enum indeed has a mutable hashcode (Enum.hashcode).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Nevertheless it's ok with the Tuple key.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for your response!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <
>>>>>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Rado,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> it is hard to tell the reason w/o a bit more details.
>>>>>>>>>>>>>>>>>>> Could you share with us the complete logs of the problematic run? Also the
>>>>>>>>>>>>>>>>>>> job you are running and the types of the state you are storing in RocksDB
>>>>>>>>>>>>>>>>>>> and use as events in your job are very important. In the linked SO
>>>>>>>>>>>>>>>>>>> question, the problem was a type whose hashcode was not immutable.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I am running a Flink job that performs data enrichment.
>>>>>>>>>>>>>>>>>>>> My job has 7 kafka consumers that receive messages for dml statements
>>>>>>>>>>>>>>>>>>>> performed for 7 db tables.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Job setup:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>    - Flink is run in k8s in a similar way as it is
>>>>>>>>>>>>>>>>>>>>    described here
>>>>>>>>>>>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>>>>>>>>>>>>>>    .
>>>>>>>>>>>>>>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>>>>>>>>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>>>>>>>>>>>>>>    - rocksdb as state backend
>>>>>>>>>>>>>>>>>>>>    - protobuf for serialization
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Whenever I try to trigger a savepoint after my state is
>>>>>>>>>>>>>>>>>>>> bootstrapped I get the following error for different operators:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Key
>>>>>>>>>>>>>>>>>>>> group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Note: key group might vary.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article
>>>>>>>>>>>>>>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>>>>>>>>>>>>>>>>>>> similar to the one described in the article except that my job has more
>>>>>>>>>>>>>>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I tried to reduce the parallelism to 1 with 1 task slot
>>>>>>>>>>>>>>>>>>>> per task manager and this configuration seems to work. This leads me to a
>>>>>>>>>>>>>>>>>>>> direction that it might be some concurrency issue.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I would like to understand what is causing the
>>>>>>>>>>>>>>>>>>>> savepoint failure. Do you have any suggestions what I might be missing?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>

Re: savepoint failure

Posted by Till Rohrmann <tr...@apache.org>.
Hi Dan,

from the snippets I suspect the following is happening: A TaskManager dies
that triggers the failover. The failover will stop the
CheckpointCoordinator that will abort all pending checkpoints. Since the
`CheckpointFailureManager` seems to be configured to treat these kinds of
failures as job failures, it tries to restart the job. See [1] for more
information. Given that the job is currently being restarted, this
shouldn't do anything. Hence, the job should eventually recover and
continue running smoothly, hopefully. For more details, I would need to see
the full logs.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int-

Cheers,
Till

On Sun, Aug 22, 2021 at 6:24 AM Dan Hill <qu...@gmail.com> wrote:

> I audited my code.  All of the directly coded keys are either Strings,
> Longs or Tuple2<String, Long>.
>
> Does the FileSink use any of the incoming records as keys?  We use
> ProtobufDatumWriter to write Protos to Avro files.  Could this have an
> issue in it?
>
> Does any part of Flink's setup try to create a key from a record
> automatically?
>
> private static <T extends GeneratedMessageV3> AvroWriterFactory<T>
> getAvroWriterFactory(Class<T> avroProtoClass) {
>
>     return new AvroWriterFactory<T>((AvroBuilder<T>) out -> {
>
>         Schema schema = ProtobufData.get().getSchema(avroProtoClass);
>
>         ProtobufDatumWriter<T> pbWriter = new
> ProtobufDatumWriter<>(schema);
>
>         DataFileWriter<T> dataFileWriter = new DataFileWriter<>(pbWriter);
>
>         dataFileWriter.setCodec(CodecFactory.snappyCodec());
>
>         dataFileWriter.create(schema, out);
>
>         return dataFileWriter;
>
>     });
>
> }
>
>
> On Sat, Aug 21, 2021 at 8:52 PM Dan Hill <qu...@gmail.com> wrote:
>
>> Do people write checkpoint/savepoint recovery tests?  E.g. persist a
>> checkpoint from a run and verify that it can be recovered?
>>
>> Also, I don't always hit this error when task managers restart.  I've had
>> plenty of task managers die while running and savepoints usually work.
>>
>>
>> On Sat, Aug 21, 2021 at 8:49 PM Dan Hill <qu...@gmail.com> wrote:
>>
>>> Darn, nevermind.  I was backfilling a job from the start of our time
>>> period (not from any checkpoint/savepoint) and, when I tried to savepoint,
>>> the job failed with "Checkpoint Coordinator is suspending.".  Then when
>>> I tried to savepoint again, I hit the key group error.
>>>
>>> 2021-08-22 03:35:42,305 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       [] - Join UserViewRequestInsertionImpression -> (Sink Writer: S3
>>> flat-user-impression -> Sink Committer: S3 flat-user-impression, Sink
>>> Writer: S3 flat-user-impression fixed -> Sink Committer: S3
>>> flat-user-impression fixed) (32/56) (167f59052298fc5d8e9c318958f3cfc4)
>>> switched from RUNNING to FAILED on 10.12.101.133:6122-164771 @
>>> flink-taskmanager-0.flink-taskmanager.default.svc.cluster.local
>>> (dataPort=42979).
>>>
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Connection unexpectedly closed by remote task manager '
>>> 10.12.100.55/10.12.100.55:43687'. This might indicate that the remote
>>> task manager was lost.
>>>
>>>         at
>>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>>>
>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink
>>> Writer: S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink
>>> Writer: S3 flat-user-action fixed -> Sink Committer: S3 flat-user-action
>>> fixed, Map -> Sink: Kafka flat-user-action-json) (15/56)
>>> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>>>
>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       [] - Discarding the results produced by task execution
>>> 3077ef9c918af545a1165d0715b5e6c6.
>>>
>>> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>>>                 [] - Trying to recover from a global failure.
>>>
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
>>> Coordinator is suspending.
>>>
>>>         at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
>>> ~[?:?]
>>>
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_292]
>>>
>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>> ~[?:1.8.0_292]
>>>
>>>         at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink
>>> Writer: S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink
>>> Writer: S3 flat-user-action fixed -> Sink Committer: S3 flat-user-action
>>> fixed, Map -> Sink: Kafka flat-user-action-json) (15/56)
>>> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>>>
>>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>       [] - Discarding the results produced by task execution
>>> 3077ef9c918af545a1165d0715b5e6c6.
>>>
>>> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>>>                 [] - Trying to recover from a global failure.
>>>
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
>>> Coordinator is suspending.
>>>
>>>         at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
>>> ~[?:?]
>>>
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_292]
>>>
>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>> ~[?:1.8.0_292]
>>>
>>>         at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>>
>>>
>>> On Fri, Aug 20, 2021 at 8:43 AM Dan Hill <qu...@gmail.com> wrote:
>>>
>>>> Thanks, Till!
>>>>
>>>> On Fri, Aug 20, 2021 at 8:31 AM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Dan, good to hear that you found the problem. What I would
>>>>> recommend is to set the log level in the log4j.properties file to DEBUG or
>>>>> TRACE (but this is quite noisy). If then the log does not contain the
>>>>> required information then it is likely that we don't log it and, hence,
>>>>> would have to be added.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Fri, Aug 20, 2021 at 5:20 PM Dan Hill <qu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think this was from a breaking change we made to the key
>>>>>> calculation in our code between version updates.  So this error makes sense.
>>>>>>
>>>>>> What's the best way to get more info for debugging?  How can I
>>>>>> configure the logs to output more key information?
>>>>>>
>>>>>> On Fri, Jul 16, 2021 at 11:29 PM Dan Hill <qu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks, Till!
>>>>>>>
>>>>>>> On Thu, Jul 15, 2021 at 12:52 AM Till Rohrmann <tr...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>>
>>>>>>>> From the logs I couldn't find anything suspicious. The job runs
>>>>>>>> until you try to draw a savepoint. When doing this Flink fails with "Key
>>>>>>>> group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}". W/o
>>>>>>>> having access to your job or a minimal example that allows to reproduce
>>>>>>>> this problem, it will be super hard to figure out what's going wrong. My
>>>>>>>> best guess would still be that we have a non deterministic key somewhere.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Thu, Jul 15, 2021 at 7:26 AM Dan Hill <qu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I don't know if it matters but I'm using unaligned checkpoints.
>>>>>>>>>
>>>>>>>>> On Wed, Jul 14, 2021 at 8:33 PM Dan Hill <qu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Here's the overview flow chart.
>>>>>>>>>>
>>>>>>>>>> [image: Screen Shot 2021-07-14 at 8.24.33 PM.png]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 14, 2021 at 7:10 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> *-others*
>>>>>>>>>>>
>>>>>>>>>>> *Code*
>>>>>>>>>>> I'm not sure of a good, secure way of sharing the java code.  It
>>>>>>>>>>> depends on multiple internal repos.  The savepoint appears to be failing in
>>>>>>>>>>> a custom KeyedCoProcessFunction that joins two keyed streams in a fuzzy
>>>>>>>>>>> way.  The streams are joined based on a Tuple2<String, Long> and has some
>>>>>>>>>>> internal map state using String keys.
>>>>>>>>>>>
>>>>>>>>>>> *Flink config*
>>>>>>>>>>> The most relevant parts of the flink config are the following:
>>>>>>>>>>> state.backend.async: true
>>>>>>>>>>> state.backend.incremental: true
>>>>>>>>>>> state.backend.local-recovery: false
>>>>>>>>>>> taskmanager.state.local.root-dirs: /flink_state/local-recovery
>>>>>>>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num: 1
>>>>>>>>>>> state.backend.rocksdb.localdir: /flink_state/rocksdb
>>>>>>>>>>> state.backend.rocksdb.options-factory:
>>>>>>>>>>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>>>>>>>>>>> state.backend.rocksdb.predefined-options: DEFAULT
>>>>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>>>>>>>> state.backend.rocksdb.ttl.compaction.filter.enabled: false
>>>>>>>>>>> state.checkpoints.dir: s3a://my-flink-state/checkpoints
>>>>>>>>>>> state.savepoints.dir: s3a://my-metrics-flink-state/savepoints
>>>>>>>>>>>
>>>>>>>>>>> *Workflow*
>>>>>>>>>>> What do you mean by workflow?
>>>>>>>>>>>
>>>>>>>>>>> *Logs*
>>>>>>>>>>> Here's the job manager log.  The task manager log did not look
>>>>>>>>>>> useful.
>>>>>>>>>>>
>>>>>>>>>>> https://drive.google.com/file/d/1jC5-3Bm2OP0dX1GJACwHGeqxd4snFc-W/view?usp=sharing
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 14, 2021 at 12:45 AM Till Rohrmann <
>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>>
>>>>>>>>>>>> Can you provide us with more information about your job (maybe
>>>>>>>>>>>> even the job code or a minimally working example), the Flink configuration,
>>>>>>>>>>>> the exact workflow you are doing and the corresponding logs and error
>>>>>>>>>>>> messages?
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Till
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Could this be caused by mixing of configuration settings when
>>>>>>>>>>>>> running?  Running a job with one parallelism, stop/savepointing and then
>>>>>>>>>>>>> recovering with a different parallelism?  I'd assume that's fine and
>>>>>>>>>>>>> wouldn't put create bad state.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <
>>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I checked my code.  Our keys for streams and map state only
>>>>>>>>>>>>>> use either (1) string, (2) long IDs that don't change or (3) Tuple of 1 and
>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I don't know why my current case is breaking.  Our job
>>>>>>>>>>>>>> partitions and parallelism settings have not changed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <
>>>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hey.  I just hit a similar error in production when trying
>>>>>>>>>>>>>>> to savepoint.  We also use protobufs.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Has anyone found a better fix to this?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <
>>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Glad to hear that you solved your problem. Afaik Flink
>>>>>>>>>>>>>>>> should not read the fields of messages and call hashCode on them.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I found my problem. It was indeed related to a mutable
>>>>>>>>>>>>>>>>> hashcode.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I was using a protobuf message in the key selector
>>>>>>>>>>>>>>>>> function and one of the protobuf fields was enum. I checked the
>>>>>>>>>>>>>>>>> implementation of the hashcode of the generated message and it is using the
>>>>>>>>>>>>>>>>> int value field of the protobuf message so I assumed that it is ok and it's
>>>>>>>>>>>>>>>>> immutable.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I replaced the key selector function to use Tuple[Long,
>>>>>>>>>>>>>>>>> Int] (since my protobuf message has only these two fields where the int
>>>>>>>>>>>>>>>>> parameter stands for the enum value field). After changing my code to use
>>>>>>>>>>>>>>>>> the Tuple it worked.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am not sure if Flink somehow reads the protobuf message
>>>>>>>>>>>>>>>>> fields and uses the hashcode of the fields directly since the generated
>>>>>>>>>>>>>>>>> protobuf enum indeed has a mutable hashcode (Enum.hashcode).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Nevertheless it's ok with the Tuple key.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for your response!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <
>>>>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Rado,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> it is hard to tell the reason w/o a bit more details.
>>>>>>>>>>>>>>>>>> Could you share with us the complete logs of the problematic run? Also the
>>>>>>>>>>>>>>>>>> job you are running and the types of the state you are storing in RocksDB
>>>>>>>>>>>>>>>>>> and use as events in your job are very important. In the linked SO
>>>>>>>>>>>>>>>>>> question, the problem was a type whose hashcode was not immutable.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am running a Flink job that performs data enrichment.
>>>>>>>>>>>>>>>>>>> My job has 7 kafka consumers that receive messages for dml statements
>>>>>>>>>>>>>>>>>>> performed for 7 db tables.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Job setup:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>    - Flink is run in k8s in a similar way as it is
>>>>>>>>>>>>>>>>>>>    described here
>>>>>>>>>>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>>>>>>>>>>>>>    .
>>>>>>>>>>>>>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>>>>>>>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>>>>>>>>>>>>>    - rocksdb as state backend
>>>>>>>>>>>>>>>>>>>    - protobuf for serialization
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Whenever I try to trigger a savepoint after my state is
>>>>>>>>>>>>>>>>>>> bootstrapped I get the following error for different operators:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Key group
>>>>>>>>>>>>>>>>>>> 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Note: key group might vary.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article
>>>>>>>>>>>>>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>>>>>>>>>>>>>>>>>> similar to the one described in the article except that my job has more
>>>>>>>>>>>>>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I tried to reduce the parallelism to 1 with 1 task slot
>>>>>>>>>>>>>>>>>>> per task manager and this configuration seems to work. This leads me to a
>>>>>>>>>>>>>>>>>>> direction that it might be some concurrency issue.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I would like to understand what is causing the savepoint
>>>>>>>>>>>>>>>>>>> failure. Do you have any suggestions what I might be missing?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>

Re: savepoint failure

Posted by Dan Hill <qu...@gmail.com>.
I audited my code.  All of the directly coded keys are either Strings,
Longs or Tuple2<String, Long>.

Does the FileSink use any of the incoming records as keys?  We use
ProtobufDatumWriter to write Protos to Avro files.  Could this have an
issue in it?

Does any part of Flink's setup try to create a key from a record
automatically?

private static <T extends GeneratedMessageV3> AvroWriterFactory<T>
getAvroWriterFactory(Class<T> avroProtoClass) {

    return new AvroWriterFactory<T>((AvroBuilder<T>) out -> {

        Schema schema = ProtobufData.get().getSchema(avroProtoClass);

        ProtobufDatumWriter<T> pbWriter = new ProtobufDatumWriter<>(schema);

        DataFileWriter<T> dataFileWriter = new DataFileWriter<>(pbWriter);

        dataFileWriter.setCodec(CodecFactory.snappyCodec());

        dataFileWriter.create(schema, out);

        return dataFileWriter;

    });

}


On Sat, Aug 21, 2021 at 8:52 PM Dan Hill <qu...@gmail.com> wrote:

> Do people write checkpoint/savepoint recovery tests?  E.g. persist a
> checkpoint from a run and verify that it can be recovered?
>
> Also, I don't always hit this error when task managers restart.  I've had
> plenty of task managers die while running and savepoints usually work.
>
>
> On Sat, Aug 21, 2021 at 8:49 PM Dan Hill <qu...@gmail.com> wrote:
>
>> Darn, nevermind.  I was backfilling a job from the start of our time
>> period (not from any checkpoint/savepoint) and, when I tried to savepoint,
>> the job failed with "Checkpoint Coordinator is suspending.".  Then when
>> I tried to savepoint again, I hit the key group error.
>>
>> 2021-08-22 03:35:42,305 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       [] - Join UserViewRequestInsertionImpression -> (Sink Writer: S3
>> flat-user-impression -> Sink Committer: S3 flat-user-impression, Sink
>> Writer: S3 flat-user-impression fixed -> Sink Committer: S3
>> flat-user-impression fixed) (32/56) (167f59052298fc5d8e9c318958f3cfc4)
>> switched from RUNNING to FAILED on 10.12.101.133:6122-164771 @
>> flink-taskmanager-0.flink-taskmanager.default.svc.cluster.local
>> (dataPort=42979).
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager '
>> 10.12.100.55/10.12.100.55:43687'. This might indicate that the remote
>> task manager was lost.
>>
>>         at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>>
>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink
>> Writer: S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink
>> Writer: S3 flat-user-action fixed -> Sink Committer: S3 flat-user-action
>> fixed, Map -> Sink: Kafka flat-user-action-json) (15/56)
>> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>>
>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       [] - Discarding the results produced by task execution
>> 3077ef9c918af545a1165d0715b5e6c6.
>>
>> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>>                 [] - Trying to recover from a global failure.
>>
>> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
>> Coordinator is suspending.
>>
>>         at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
>> ~[?:?]
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_292]
>>
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>> ~[?:1.8.0_292]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink
>> Writer: S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink
>> Writer: S3 flat-user-action fixed -> Sink Committer: S3 flat-user-action
>> fixed, Map -> Sink: Kafka flat-user-action-json) (15/56)
>> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>>
>> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>       [] - Discarding the results produced by task execution
>> 3077ef9c918af545a1165d0715b5e6c6.
>>
>> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>>                 [] - Trying to recover from a global failure.
>>
>> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
>> Coordinator is suspending.
>>
>>         at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
>> ~[?:?]
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_292]
>>
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>> ~[?:1.8.0_292]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.12-1.12.3.jar:1.12.3]
>>
>>
>> On Fri, Aug 20, 2021 at 8:43 AM Dan Hill <qu...@gmail.com> wrote:
>>
>>> Thanks, Till!
>>>
>>> On Fri, Aug 20, 2021 at 8:31 AM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Dan, good to hear that you found the problem. What I would recommend
>>>> is to set the log level in the log4j.properties file to DEBUG or TRACE (but
>>>> this is quite noisy). If then the log does not contain the required
>>>> information then it is likely that we don't log it and, hence, would have
>>>> to be added.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Aug 20, 2021 at 5:20 PM Dan Hill <qu...@gmail.com> wrote:
>>>>
>>>>> I think this was from a breaking change we made to the key calculation
>>>>> in our code between version updates.  So this error makes sense.
>>>>>
>>>>> What's the best way to get more info for debugging?  How can I
>>>>> configure the logs to output more key information?
>>>>>
>>>>> On Fri, Jul 16, 2021 at 11:29 PM Dan Hill <qu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks, Till!
>>>>>>
>>>>>> On Thu, Jul 15, 2021 at 12:52 AM Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Dan,
>>>>>>>
>>>>>>> From the logs I couldn't find anything suspicious. The job runs
>>>>>>> until you try to draw a savepoint. When doing this Flink fails with "Key
>>>>>>> group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}". W/o
>>>>>>> having access to your job or a minimal example that allows to reproduce
>>>>>>> this problem, it will be super hard to figure out what's going wrong. My
>>>>>>> best guess would still be that we have a non deterministic key somewhere.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Thu, Jul 15, 2021 at 7:26 AM Dan Hill <qu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I don't know if it matters but I'm using unaligned checkpoints.
>>>>>>>>
>>>>>>>> On Wed, Jul 14, 2021 at 8:33 PM Dan Hill <qu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Here's the overview flow chart.
>>>>>>>>>
>>>>>>>>> [image: Screen Shot 2021-07-14 at 8.24.33 PM.png]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 14, 2021 at 7:10 PM Dan Hill <qu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> *-others*
>>>>>>>>>>
>>>>>>>>>> *Code*
>>>>>>>>>> I'm not sure of a good, secure way of sharing the java code.  It
>>>>>>>>>> depends on multiple internal repos.  The savepoint appears to be failing in
>>>>>>>>>> a custom KeyedCoProcessFunction that joins two keyed streams in a fuzzy
>>>>>>>>>> way.  The streams are joined based on a Tuple2<String, Long> and has some
>>>>>>>>>> internal map state using String keys.
>>>>>>>>>>
>>>>>>>>>> *Flink config*
>>>>>>>>>> The most relevant parts of the flink config are the following:
>>>>>>>>>> state.backend.async: true
>>>>>>>>>> state.backend.incremental: true
>>>>>>>>>> state.backend.local-recovery: false
>>>>>>>>>> taskmanager.state.local.root-dirs: /flink_state/local-recovery
>>>>>>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num: 1
>>>>>>>>>> state.backend.rocksdb.localdir: /flink_state/rocksdb
>>>>>>>>>> state.backend.rocksdb.options-factory:
>>>>>>>>>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>>>>>>>>>> state.backend.rocksdb.predefined-options: DEFAULT
>>>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>>>>>>> state.backend.rocksdb.ttl.compaction.filter.enabled: false
>>>>>>>>>> state.checkpoints.dir: s3a://my-flink-state/checkpoints
>>>>>>>>>> state.savepoints.dir: s3a://my-metrics-flink-state/savepoints
>>>>>>>>>>
>>>>>>>>>> *Workflow*
>>>>>>>>>> What do you mean by workflow?
>>>>>>>>>>
>>>>>>>>>> *Logs*
>>>>>>>>>> Here's the job manager log.  The task manager log did not look
>>>>>>>>>> useful.
>>>>>>>>>>
>>>>>>>>>> https://drive.google.com/file/d/1jC5-3Bm2OP0dX1GJACwHGeqxd4snFc-W/view?usp=sharing
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 14, 2021 at 12:45 AM Till Rohrmann <
>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Dan,
>>>>>>>>>>>
>>>>>>>>>>> Can you provide us with more information about your job (maybe
>>>>>>>>>>> even the job code or a minimally working example), the Flink configuration,
>>>>>>>>>>> the exact workflow you are doing and the corresponding logs and error
>>>>>>>>>>> messages?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Till
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Could this be caused by mixing of configuration settings when
>>>>>>>>>>>> running?  Running a job with one parallelism, stop/savepointing and then
>>>>>>>>>>>> recovering with a different parallelism?  I'd assume that's fine and
>>>>>>>>>>>> wouldn't put create bad state.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <
>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I checked my code.  Our keys for streams and map state only
>>>>>>>>>>>>> use either (1) string, (2) long IDs that don't change or (3) Tuple of 1 and
>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I don't know why my current case is breaking.  Our job
>>>>>>>>>>>>> partitions and parallelism settings have not changed.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <
>>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey.  I just hit a similar error in production when trying to
>>>>>>>>>>>>>> savepoint.  We also use protobufs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Has anyone found a better fix to this?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <
>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Glad to hear that you solved your problem. Afaik Flink
>>>>>>>>>>>>>>> should not read the fields of messages and call hashCode on them.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I found my problem. It was indeed related to a mutable
>>>>>>>>>>>>>>>> hashcode.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I was using a protobuf message in the key selector function
>>>>>>>>>>>>>>>> and one of the protobuf fields was enum. I checked the implementation of
>>>>>>>>>>>>>>>> the hashcode of the generated message and it is using the int value field
>>>>>>>>>>>>>>>> of the protobuf message so I assumed that it is ok and it's immutable.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I replaced the key selector function to use Tuple[Long,
>>>>>>>>>>>>>>>> Int] (since my protobuf message has only these two fields where the int
>>>>>>>>>>>>>>>> parameter stands for the enum value field). After changing my code to use
>>>>>>>>>>>>>>>> the Tuple it worked.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am not sure if Flink somehow reads the protobuf message
>>>>>>>>>>>>>>>> fields and uses the hashcode of the fields directly since the generated
>>>>>>>>>>>>>>>> protobuf enum indeed has a mutable hashcode (Enum.hashcode).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Nevertheless it's ok with the Tuple key.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your response!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <
>>>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Rado,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> it is hard to tell the reason w/o a bit more details.
>>>>>>>>>>>>>>>>> Could you share with us the complete logs of the problematic run? Also the
>>>>>>>>>>>>>>>>> job you are running and the types of the state you are storing in RocksDB
>>>>>>>>>>>>>>>>> and use as events in your job are very important. In the linked SO
>>>>>>>>>>>>>>>>> question, the problem was a type whose hashcode was not immutable.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am running a Flink job that performs data enrichment.
>>>>>>>>>>>>>>>>>> My job has 7 kafka consumers that receive messages for dml statements
>>>>>>>>>>>>>>>>>> performed for 7 db tables.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Job setup:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>    - Flink is run in k8s in a similar way as it is
>>>>>>>>>>>>>>>>>>    described here
>>>>>>>>>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>>>>>>>>>>>>    .
>>>>>>>>>>>>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>>>>>>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>>>>>>>>>>>>    - rocksdb as state backend
>>>>>>>>>>>>>>>>>>    - protobuf for serialization
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Whenever I try to trigger a savepoint after my state is
>>>>>>>>>>>>>>>>>> bootstrapped I get the following error for different operators:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Key group
>>>>>>>>>>>>>>>>>> 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Note: key group might vary.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article
>>>>>>>>>>>>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>>>>>>>>>>>>>>>>> similar to the one described in the article except that my job has more
>>>>>>>>>>>>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I tried to reduce the parallelism to 1 with 1 task slot
>>>>>>>>>>>>>>>>>> per task manager and this configuration seems to work. This leads me to a
>>>>>>>>>>>>>>>>>> direction that it might be some concurrency issue.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I would like to understand what is causing the savepoint
>>>>>>>>>>>>>>>>>> failure. Do you have any suggestions what I might be missing?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>

Re: savepoint failure

Posted by Dan Hill <qu...@gmail.com>.
Do people write checkpoint/savepoint recovery tests?  E.g. persist a
checkpoint from a run and verify that it can be recovered?

Also, I don't always hit this error when task managers restart.  I've had
plenty of task managers die while running and savepoints usually work.


On Sat, Aug 21, 2021 at 8:49 PM Dan Hill <qu...@gmail.com> wrote:

> Darn, nevermind.  I was backfilling a job from the start of our time
> period (not from any checkpoint/savepoint) and, when I tried to savepoint,
> the job failed with "Checkpoint Coordinator is suspending.".  Then when I
> tried to savepoint again, I hit the key group error.
>
> 2021-08-22 03:35:42,305 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       [] - Join UserViewRequestInsertionImpression -> (Sink Writer: S3
> flat-user-impression -> Sink Committer: S3 flat-user-impression, Sink
> Writer: S3 flat-user-impression fixed -> Sink Committer: S3
> flat-user-impression fixed) (32/56) (167f59052298fc5d8e9c318958f3cfc4)
> switched from RUNNING to FAILED on 10.12.101.133:6122-164771 @
> flink-taskmanager-0.flink-taskmanager.default.svc.cluster.local
> (dataPort=42979).
>
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager '
> 10.12.100.55/10.12.100.55:43687'. This might indicate that the remote
> task manager was lost.
>
>         at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>
> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink Writer:
> S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink Writer: S3
> flat-user-action fixed -> Sink Committer: S3 flat-user-action fixed, Map ->
> Sink: Kafka flat-user-action-json) (15/56)
> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>
> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       [] - Discarding the results produced by task execution
> 3077ef9c918af545a1165d0715b5e6c6.
>
> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>                 [] - Trying to recover from a global failure.
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> Coordinator is suspending.
>
>         at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
> ~[?:?]
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_292]
>
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_292]
>
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       [] - Join UserViewRequestInsertionImpressionAction -> (Sink Writer:
> S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink Writer: S3
> flat-user-action fixed -> Sink Committer: S3 flat-user-action fixed, Map ->
> Sink: Kafka flat-user-action-json) (15/56)
> (3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.
>
> 2021-08-22 03:35:42,419 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       [] - Discarding the results produced by task execution
> 3077ef9c918af545a1165d0715b5e6c6.
>
> 2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>                 [] - Trying to recover from a global failure.
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> Coordinator is suspending.
>
>         at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
> ~[?:?]
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_292]
>
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_292]
>
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.12.3.jar:1.12.3]
>
>
> On Fri, Aug 20, 2021 at 8:43 AM Dan Hill <qu...@gmail.com> wrote:
>
>> Thanks, Till!
>>
>> On Fri, Aug 20, 2021 at 8:31 AM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Dan, good to hear that you found the problem. What I would recommend
>>> is to set the log level in the log4j.properties file to DEBUG or TRACE (but
>>> this is quite noisy). If then the log does not contain the required
>>> information then it is likely that we don't log it and, hence, would have
>>> to be added.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 20, 2021 at 5:20 PM Dan Hill <qu...@gmail.com> wrote:
>>>
>>>> I think this was from a breaking change we made to the key calculation
>>>> in our code between version updates.  So this error makes sense.
>>>>
>>>> What's the best way to get more info for debugging?  How can I
>>>> configure the logs to output more key information?
>>>>
>>>> On Fri, Jul 16, 2021 at 11:29 PM Dan Hill <qu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks, Till!
>>>>>
>>>>> On Thu, Jul 15, 2021 at 12:52 AM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Dan,
>>>>>>
>>>>>> From the logs I couldn't find anything suspicious. The job runs until
>>>>>> you try to draw a savepoint. When doing this Flink fails with "Key group 0
>>>>>> is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}". W/o having
>>>>>> access to your job or a minimal example that allows to reproduce this
>>>>>> problem, it will be super hard to figure out what's going wrong. My best
>>>>>> guess would still be that we have a non deterministic key somewhere.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Thu, Jul 15, 2021 at 7:26 AM Dan Hill <qu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I don't know if it matters but I'm using unaligned checkpoints.
>>>>>>>
>>>>>>> On Wed, Jul 14, 2021 at 8:33 PM Dan Hill <qu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Here's the overview flow chart.
>>>>>>>>
>>>>>>>> [image: Screen Shot 2021-07-14 at 8.24.33 PM.png]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 14, 2021 at 7:10 PM Dan Hill <qu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> *-others*
>>>>>>>>>
>>>>>>>>> *Code*
>>>>>>>>> I'm not sure of a good, secure way of sharing the java code.  It
>>>>>>>>> depends on multiple internal repos.  The savepoint appears to be failing in
>>>>>>>>> a custom KeyedCoProcessFunction that joins two keyed streams in a fuzzy
>>>>>>>>> way.  The streams are joined based on a Tuple2<String, Long> and has some
>>>>>>>>> internal map state using String keys.
>>>>>>>>>
>>>>>>>>> *Flink config*
>>>>>>>>> The most relevant parts of the flink config are the following:
>>>>>>>>> state.backend.async: true
>>>>>>>>> state.backend.incremental: true
>>>>>>>>> state.backend.local-recovery: false
>>>>>>>>> taskmanager.state.local.root-dirs: /flink_state/local-recovery
>>>>>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num: 1
>>>>>>>>> state.backend.rocksdb.localdir: /flink_state/rocksdb
>>>>>>>>> state.backend.rocksdb.options-factory:
>>>>>>>>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>>>>>>>>> state.backend.rocksdb.predefined-options: DEFAULT
>>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>>>>>> state.backend.rocksdb.ttl.compaction.filter.enabled: false
>>>>>>>>> state.checkpoints.dir: s3a://my-flink-state/checkpoints
>>>>>>>>> state.savepoints.dir: s3a://my-metrics-flink-state/savepoints
>>>>>>>>>
>>>>>>>>> *Workflow*
>>>>>>>>> What do you mean by workflow?
>>>>>>>>>
>>>>>>>>> *Logs*
>>>>>>>>> Here's the job manager log.  The task manager log did not look
>>>>>>>>> useful.
>>>>>>>>>
>>>>>>>>> https://drive.google.com/file/d/1jC5-3Bm2OP0dX1GJACwHGeqxd4snFc-W/view?usp=sharing
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 14, 2021 at 12:45 AM Till Rohrmann <
>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Dan,
>>>>>>>>>>
>>>>>>>>>> Can you provide us with more information about your job (maybe
>>>>>>>>>> even the job code or a minimally working example), the Flink configuration,
>>>>>>>>>> the exact workflow you are doing and the corresponding logs and error
>>>>>>>>>> messages?
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Could this be caused by mixing of configuration settings when
>>>>>>>>>>> running?  Running a job with one parallelism, stop/savepointing and then
>>>>>>>>>>> recovering with a different parallelism?  I'd assume that's fine and
>>>>>>>>>>> wouldn't put create bad state.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I checked my code.  Our keys for streams and map state only use
>>>>>>>>>>>> either (1) string, (2) long IDs that don't change or (3) Tuple of 1 and 2.
>>>>>>>>>>>>
>>>>>>>>>>>> I don't know why my current case is breaking.  Our job
>>>>>>>>>>>> partitions and parallelism settings have not changed.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <
>>>>>>>>>>>> quietgolfer@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey.  I just hit a similar error in production when trying to
>>>>>>>>>>>>> savepoint.  We also use protobufs.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Has anyone found a better fix to this?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <
>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Glad to hear that you solved your problem. Afaik Flink should
>>>>>>>>>>>>>> not read the fields of messages and call hashCode on them.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I found my problem. It was indeed related to a mutable
>>>>>>>>>>>>>>> hashcode.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I was using a protobuf message in the key selector function
>>>>>>>>>>>>>>> and one of the protobuf fields was enum. I checked the implementation of
>>>>>>>>>>>>>>> the hashcode of the generated message and it is using the int value field
>>>>>>>>>>>>>>> of the protobuf message so I assumed that it is ok and it's immutable.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I replaced the key selector function to use Tuple[Long, Int]
>>>>>>>>>>>>>>> (since my protobuf message has only these two fields where the int
>>>>>>>>>>>>>>> parameter stands for the enum value field). After changing my code to use
>>>>>>>>>>>>>>> the Tuple it worked.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am not sure if Flink somehow reads the protobuf message
>>>>>>>>>>>>>>> fields and uses the hashcode of the fields directly since the generated
>>>>>>>>>>>>>>> protobuf enum indeed has a mutable hashcode (Enum.hashcode).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Nevertheless it's ok with the Tuple key.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for your response!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <
>>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Rado,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> it is hard to tell the reason w/o a bit more details. Could
>>>>>>>>>>>>>>>> you share with us the complete logs of the problematic run? Also the job
>>>>>>>>>>>>>>>> you are running and the types of the state you are storing in RocksDB and
>>>>>>>>>>>>>>>> use as events in your job are very important. In the linked SO question,
>>>>>>>>>>>>>>>> the problem was a type whose hashcode was not immutable.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am running a Flink job that performs data enrichment. My
>>>>>>>>>>>>>>>>> job has 7 kafka consumers that receive messages for dml statements
>>>>>>>>>>>>>>>>> performed for 7 db tables.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Job setup:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>    - Flink is run in k8s in a similar way as it is
>>>>>>>>>>>>>>>>>    described here
>>>>>>>>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>>>>>>>>>>>    .
>>>>>>>>>>>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>>>>>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>>>>>>>>>>>    - rocksdb as state backend
>>>>>>>>>>>>>>>>>    - protobuf for serialization
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Whenever I try to trigger a savepoint after my state is
>>>>>>>>>>>>>>>>> bootstrapped I get the following error for different operators:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Key group 0
>>>>>>>>>>>>>>>>> is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Note: key group might vary.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article
>>>>>>>>>>>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>>>>>>>>>>>>>>>> similar to the one described in the article except that my job has more
>>>>>>>>>>>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I tried to reduce the parallelism to 1 with 1 task slot
>>>>>>>>>>>>>>>>> per task manager and this configuration seems to work. This leads me to a
>>>>>>>>>>>>>>>>> direction that it might be some concurrency issue.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I would like to understand what is causing the savepoint
>>>>>>>>>>>>>>>>> failure. Do you have any suggestions what I might be missing?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>

Re: savepoint failure

Posted by Dan Hill <qu...@gmail.com>.
Darn, nevermind.  I was backfilling a job from the start of our time period
(not from any checkpoint/savepoint) and, when I tried to savepoint, the job
failed with "Checkpoint Coordinator is suspending.".  Then when I tried to
savepoint again, I hit the key group error.

2021-08-22 03:35:42,305 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
      [] - Join UserViewRequestInsertionImpression -> (Sink Writer: S3
flat-user-impression -> Sink Committer: S3 flat-user-impression, Sink
Writer: S3 flat-user-impression fixed -> Sink Committer: S3
flat-user-impression fixed) (32/56) (167f59052298fc5d8e9c318958f3cfc4)
switched from RUNNING to FAILED on 10.12.101.133:6122-164771 @
flink-taskmanager-0.flink-taskmanager.default.svc.cluster.local
(dataPort=42979).

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager '
10.12.100.55/10.12.100.55:43687'. This might indicate that the remote task
manager was lost.

        at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

2021-08-22 03:35:42,419 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
      [] - Join UserViewRequestInsertionImpressionAction -> (Sink Writer:
S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink Writer: S3
flat-user-action fixed -> Sink Committer: S3 flat-user-action fixed, Map ->
Sink: Kafka flat-user-action-json) (15/56)
(3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.

2021-08-22 03:35:42,419 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
      [] - Discarding the results produced by task execution
3077ef9c918af545a1165d0715b5e6c6.

2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
              [] - Trying to recover from a global failure.

org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
Coordinator is suspending.

        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
~[?:?]

        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_292]

        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_292]

        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.Actor.aroundReceive(Actor.scala:517)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.12-1.12.3.jar:1.12.3]

2021-08-22 03:35:42,419 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
      [] - Join UserViewRequestInsertionImpressionAction -> (Sink Writer:
S3 flat-user-action -> Sink Committer: S3 flat-user-action, Sink Writer: S3
flat-user-action fixed -> Sink Committer: S3 flat-user-action fixed, Map ->
Sink: Kafka flat-user-action-json) (15/56)
(3077ef9c918af545a1165d0715b5e6c6) switched from CANCELING to CANCELED.

2021-08-22 03:35:42,419 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
      [] - Discarding the results produced by task execution
3077ef9c918af545a1165d0715b5e6c6.

2021-08-22 03:35:42,422 INFO  org.apache.flink.runtime.jobmaster.JobMaster
              [] - Trying to recover from a global failure.

org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
Coordinator is suspending.

        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1740)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:47)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1812)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1326)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1298)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:582)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:291)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:275)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:258)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:234)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
~[?:?]

        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_292]

        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_292]

        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.Actor.aroundReceive(Actor.scala:517)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.12-1.12.3.jar:1.12.3]

        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.12-1.12.3.jar:1.12.3]


On Fri, Aug 20, 2021 at 8:43 AM Dan Hill <qu...@gmail.com> wrote:

> Thanks, Till!
>
> On Fri, Aug 20, 2021 at 8:31 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Dan, good to hear that you found the problem. What I would recommend
>> is to set the log level in the log4j.properties file to DEBUG or TRACE (but
>> this is quite noisy). If then the log does not contain the required
>> information then it is likely that we don't log it and, hence, would have
>> to be added.
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 20, 2021 at 5:20 PM Dan Hill <qu...@gmail.com> wrote:
>>
>>> I think this was from a breaking change we made to the key calculation
>>> in our code between version updates.  So this error makes sense.
>>>
>>> What's the best way to get more info for debugging?  How can I configure
>>> the logs to output more key information?
>>>
>>> On Fri, Jul 16, 2021 at 11:29 PM Dan Hill <qu...@gmail.com> wrote:
>>>
>>>> Thanks, Till!
>>>>
>>>> On Thu, Jul 15, 2021 at 12:52 AM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> From the logs I couldn't find anything suspicious. The job runs until
>>>>> you try to draw a savepoint. When doing this Flink fails with "Key group 0
>>>>> is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}". W/o having
>>>>> access to your job or a minimal example that allows to reproduce this
>>>>> problem, it will be super hard to figure out what's going wrong. My best
>>>>> guess would still be that we have a non deterministic key somewhere.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Jul 15, 2021 at 7:26 AM Dan Hill <qu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I don't know if it matters but I'm using unaligned checkpoints.
>>>>>>
>>>>>> On Wed, Jul 14, 2021 at 8:33 PM Dan Hill <qu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Here's the overview flow chart.
>>>>>>>
>>>>>>> [image: Screen Shot 2021-07-14 at 8.24.33 PM.png]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 14, 2021 at 7:10 PM Dan Hill <qu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> *-others*
>>>>>>>>
>>>>>>>> *Code*
>>>>>>>> I'm not sure of a good, secure way of sharing the java code.  It
>>>>>>>> depends on multiple internal repos.  The savepoint appears to be failing in
>>>>>>>> a custom KeyedCoProcessFunction that joins two keyed streams in a fuzzy
>>>>>>>> way.  The streams are joined based on a Tuple2<String, Long> and has some
>>>>>>>> internal map state using String keys.
>>>>>>>>
>>>>>>>> *Flink config*
>>>>>>>> The most relevant parts of the flink config are the following:
>>>>>>>> state.backend.async: true
>>>>>>>> state.backend.incremental: true
>>>>>>>> state.backend.local-recovery: false
>>>>>>>> taskmanager.state.local.root-dirs: /flink_state/local-recovery
>>>>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num: 1
>>>>>>>> state.backend.rocksdb.localdir: /flink_state/rocksdb
>>>>>>>> state.backend.rocksdb.options-factory:
>>>>>>>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>>>>>>>> state.backend.rocksdb.predefined-options: DEFAULT
>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>>>>> state.backend.rocksdb.ttl.compaction.filter.enabled: false
>>>>>>>> state.checkpoints.dir: s3a://my-flink-state/checkpoints
>>>>>>>> state.savepoints.dir: s3a://my-metrics-flink-state/savepoints
>>>>>>>>
>>>>>>>> *Workflow*
>>>>>>>> What do you mean by workflow?
>>>>>>>>
>>>>>>>> *Logs*
>>>>>>>> Here's the job manager log.  The task manager log did not look
>>>>>>>> useful.
>>>>>>>>
>>>>>>>> https://drive.google.com/file/d/1jC5-3Bm2OP0dX1GJACwHGeqxd4snFc-W/view?usp=sharing
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 14, 2021 at 12:45 AM Till Rohrmann <
>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Dan,
>>>>>>>>>
>>>>>>>>> Can you provide us with more information about your job (maybe
>>>>>>>>> even the job code or a minimally working example), the Flink configuration,
>>>>>>>>> the exact workflow you are doing and the corresponding logs and error
>>>>>>>>> messages?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <qu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Could this be caused by mixing of configuration settings when
>>>>>>>>>> running?  Running a job with one parallelism, stop/savepointing and then
>>>>>>>>>> recovering with a different parallelism?  I'd assume that's fine and
>>>>>>>>>> wouldn't put create bad state.
>>>>>>>>>>
>>>>>>>>>> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I checked my code.  Our keys for streams and map state only use
>>>>>>>>>>> either (1) string, (2) long IDs that don't change or (3) Tuple of 1 and 2.
>>>>>>>>>>>
>>>>>>>>>>> I don't know why my current case is breaking.  Our job
>>>>>>>>>>> partitions and parallelism settings have not changed.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey.  I just hit a similar error in production when trying to
>>>>>>>>>>>> savepoint.  We also use protobufs.
>>>>>>>>>>>>
>>>>>>>>>>>> Has anyone found a better fix to this?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <
>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Glad to hear that you solved your problem. Afaik Flink should
>>>>>>>>>>>>> not read the fields of messages and call hashCode on them.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Till
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I found my problem. It was indeed related to a mutable
>>>>>>>>>>>>>> hashcode.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I was using a protobuf message in the key selector function
>>>>>>>>>>>>>> and one of the protobuf fields was enum. I checked the implementation of
>>>>>>>>>>>>>> the hashcode of the generated message and it is using the int value field
>>>>>>>>>>>>>> of the protobuf message so I assumed that it is ok and it's immutable.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I replaced the key selector function to use Tuple[Long, Int]
>>>>>>>>>>>>>> (since my protobuf message has only these two fields where the int
>>>>>>>>>>>>>> parameter stands for the enum value field). After changing my code to use
>>>>>>>>>>>>>> the Tuple it worked.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am not sure if Flink somehow reads the protobuf message
>>>>>>>>>>>>>> fields and uses the hashcode of the fields directly since the generated
>>>>>>>>>>>>>> protobuf enum indeed has a mutable hashcode (Enum.hashcode).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Nevertheless it's ok with the Tuple key.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your response!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <
>>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Rado,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> it is hard to tell the reason w/o a bit more details. Could
>>>>>>>>>>>>>>> you share with us the complete logs of the problematic run? Also the job
>>>>>>>>>>>>>>> you are running and the types of the state you are storing in RocksDB and
>>>>>>>>>>>>>>> use as events in your job are very important. In the linked SO question,
>>>>>>>>>>>>>>> the problem was a type whose hashcode was not immutable.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am running a Flink job that performs data enrichment. My
>>>>>>>>>>>>>>>> job has 7 kafka consumers that receive messages for dml statements
>>>>>>>>>>>>>>>> performed for 7 db tables.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Job setup:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>    - Flink is run in k8s in a similar way as it is
>>>>>>>>>>>>>>>>    described here
>>>>>>>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>>>>>>>>>>    .
>>>>>>>>>>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>>>>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>>>>>>>>>>    - rocksdb as state backend
>>>>>>>>>>>>>>>>    - protobuf for serialization
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Whenever I try to trigger a savepoint after my state is
>>>>>>>>>>>>>>>> bootstrapped I get the following error for different operators:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Key group 0
>>>>>>>>>>>>>>>> is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Note: key group might vary.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article
>>>>>>>>>>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>>>>>>>>>>>>>>> similar to the one described in the article except that my job has more
>>>>>>>>>>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I tried to reduce the parallelism to 1 with 1 task slot per
>>>>>>>>>>>>>>>> task manager and this configuration seems to work. This leads me to a
>>>>>>>>>>>>>>>> direction that it might be some concurrency issue.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would like to understand what is causing the savepoint
>>>>>>>>>>>>>>>> failure. Do you have any suggestions what I might be missing?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Re: savepoint failure

Posted by Dan Hill <qu...@gmail.com>.
Thanks, Till!

On Fri, Aug 20, 2021 at 8:31 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Dan, good to hear that you found the problem. What I would recommend is
> to set the log level in the log4j.properties file to DEBUG or TRACE (but
> this is quite noisy). If then the log does not contain the required
> information then it is likely that we don't log it and, hence, would have
> to be added.
>
> Cheers,
> Till
>
> On Fri, Aug 20, 2021 at 5:20 PM Dan Hill <qu...@gmail.com> wrote:
>
>> I think this was from a breaking change we made to the key calculation in
>> our code between version updates.  So this error makes sense.
>>
>> What's the best way to get more info for debugging?  How can I configure
>> the logs to output more key information?
>>
>> On Fri, Jul 16, 2021 at 11:29 PM Dan Hill <qu...@gmail.com> wrote:
>>
>>> Thanks, Till!
>>>
>>> On Thu, Jul 15, 2021 at 12:52 AM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> From the logs I couldn't find anything suspicious. The job runs until
>>>> you try to draw a savepoint. When doing this Flink fails with "Key group 0
>>>> is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}". W/o having
>>>> access to your job or a minimal example that allows to reproduce this
>>>> problem, it will be super hard to figure out what's going wrong. My best
>>>> guess would still be that we have a non deterministic key somewhere.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Jul 15, 2021 at 7:26 AM Dan Hill <qu...@gmail.com> wrote:
>>>>
>>>>> I don't know if it matters but I'm using unaligned checkpoints.
>>>>>
>>>>> On Wed, Jul 14, 2021 at 8:33 PM Dan Hill <qu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Here's the overview flow chart.
>>>>>>
>>>>>> [image: Screen Shot 2021-07-14 at 8.24.33 PM.png]
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 14, 2021 at 7:10 PM Dan Hill <qu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> *-others*
>>>>>>>
>>>>>>> *Code*
>>>>>>> I'm not sure of a good, secure way of sharing the java code.  It
>>>>>>> depends on multiple internal repos.  The savepoint appears to be failing in
>>>>>>> a custom KeyedCoProcessFunction that joins two keyed streams in a fuzzy
>>>>>>> way.  The streams are joined based on a Tuple2<String, Long> and has some
>>>>>>> internal map state using String keys.
>>>>>>>
>>>>>>> *Flink config*
>>>>>>> The most relevant parts of the flink config are the following:
>>>>>>> state.backend.async: true
>>>>>>> state.backend.incremental: true
>>>>>>> state.backend.local-recovery: false
>>>>>>> taskmanager.state.local.root-dirs: /flink_state/local-recovery
>>>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num: 1
>>>>>>> state.backend.rocksdb.localdir: /flink_state/rocksdb
>>>>>>> state.backend.rocksdb.options-factory:
>>>>>>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>>>>>>> state.backend.rocksdb.predefined-options: DEFAULT
>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>>>> state.backend.rocksdb.ttl.compaction.filter.enabled: false
>>>>>>> state.checkpoints.dir: s3a://my-flink-state/checkpoints
>>>>>>> state.savepoints.dir: s3a://my-metrics-flink-state/savepoints
>>>>>>>
>>>>>>> *Workflow*
>>>>>>> What do you mean by workflow?
>>>>>>>
>>>>>>> *Logs*
>>>>>>> Here's the job manager log.  The task manager log did not look
>>>>>>> useful.
>>>>>>>
>>>>>>> https://drive.google.com/file/d/1jC5-3Bm2OP0dX1GJACwHGeqxd4snFc-W/view?usp=sharing
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 14, 2021 at 12:45 AM Till Rohrmann <tr...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Dan,
>>>>>>>>
>>>>>>>> Can you provide us with more information about your job (maybe even
>>>>>>>> the job code or a minimally working example), the Flink configuration, the
>>>>>>>> exact workflow you are doing and the corresponding logs and error messages?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <qu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Could this be caused by mixing of configuration settings when
>>>>>>>>> running?  Running a job with one parallelism, stop/savepointing and then
>>>>>>>>> recovering with a different parallelism?  I'd assume that's fine and
>>>>>>>>> wouldn't put create bad state.
>>>>>>>>>
>>>>>>>>> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <qu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I checked my code.  Our keys for streams and map state only use
>>>>>>>>>> either (1) string, (2) long IDs that don't change or (3) Tuple of 1 and 2.
>>>>>>>>>>
>>>>>>>>>> I don't know why my current case is breaking.  Our job partitions
>>>>>>>>>> and parallelism settings have not changed.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <qu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey.  I just hit a similar error in production when trying to
>>>>>>>>>>> savepoint.  We also use protobufs.
>>>>>>>>>>>
>>>>>>>>>>> Has anyone found a better fix to this?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <
>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Glad to hear that you solved your problem. Afaik Flink should
>>>>>>>>>>>> not read the fields of messages and call hashCode on them.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Till
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I found my problem. It was indeed related to a mutable
>>>>>>>>>>>>> hashcode.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was using a protobuf message in the key selector function
>>>>>>>>>>>>> and one of the protobuf fields was enum. I checked the implementation of
>>>>>>>>>>>>> the hashcode of the generated message and it is using the int value field
>>>>>>>>>>>>> of the protobuf message so I assumed that it is ok and it's immutable.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I replaced the key selector function to use Tuple[Long, Int]
>>>>>>>>>>>>> (since my protobuf message has only these two fields where the int
>>>>>>>>>>>>> parameter stands for the enum value field). After changing my code to use
>>>>>>>>>>>>> the Tuple it worked.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am not sure if Flink somehow reads the protobuf message
>>>>>>>>>>>>> fields and uses the hashcode of the fields directly since the generated
>>>>>>>>>>>>> protobuf enum indeed has a mutable hashcode (Enum.hashcode).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Nevertheless it's ok with the Tuple key.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your response!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <
>>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Rado,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> it is hard to tell the reason w/o a bit more details. Could
>>>>>>>>>>>>>> you share with us the complete logs of the problematic run? Also the job
>>>>>>>>>>>>>> you are running and the types of the state you are storing in RocksDB and
>>>>>>>>>>>>>> use as events in your job are very important. In the linked SO question,
>>>>>>>>>>>>>> the problem was a type whose hashcode was not immutable.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am running a Flink job that performs data enrichment. My
>>>>>>>>>>>>>>> job has 7 kafka consumers that receive messages for dml statements
>>>>>>>>>>>>>>> performed for 7 db tables.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Job setup:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    - Flink is run in k8s in a similar way as it is
>>>>>>>>>>>>>>>    described here
>>>>>>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>>>>>>>>>    .
>>>>>>>>>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>>>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>>>>>>>>>    - rocksdb as state backend
>>>>>>>>>>>>>>>    - protobuf for serialization
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Whenever I try to trigger a savepoint after my state is
>>>>>>>>>>>>>>> bootstrapped I get the following error for different operators:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Key group 0
>>>>>>>>>>>>>>> is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Note: key group might vary.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article
>>>>>>>>>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>>>>>>>>>>>>>> similar to the one described in the article except that my job has more
>>>>>>>>>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I tried to reduce the parallelism to 1 with 1 task slot per
>>>>>>>>>>>>>>> task manager and this configuration seems to work. This leads me to a
>>>>>>>>>>>>>>> direction that it might be some concurrency issue.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I would like to understand what is causing the savepoint
>>>>>>>>>>>>>>> failure. Do you have any suggestions what I might be missing?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Re: savepoint failure

Posted by Till Rohrmann <tr...@apache.org>.
Hi Dan, good to hear that you found the problem. What I would recommend is
to set the log level in the log4j.properties file to DEBUG or TRACE (but
this is quite noisy). If then the log does not contain the required
information then it is likely that we don't log it and, hence, would have
to be added.

Cheers,
Till

On Fri, Aug 20, 2021 at 5:20 PM Dan Hill <qu...@gmail.com> wrote:

> I think this was from a breaking change we made to the key calculation in
> our code between version updates.  So this error makes sense.
>
> What's the best way to get more info for debugging?  How can I configure
> the logs to output more key information?
>
> On Fri, Jul 16, 2021 at 11:29 PM Dan Hill <qu...@gmail.com> wrote:
>
>> Thanks, Till!
>>
>> On Thu, Jul 15, 2021 at 12:52 AM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Dan,
>>>
>>> From the logs I couldn't find anything suspicious. The job runs until
>>> you try to draw a savepoint. When doing this Flink fails with "Key group 0
>>> is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}". W/o having
>>> access to your job or a minimal example that allows to reproduce this
>>> problem, it will be super hard to figure out what's going wrong. My best
>>> guess would still be that we have a non deterministic key somewhere.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jul 15, 2021 at 7:26 AM Dan Hill <qu...@gmail.com> wrote:
>>>
>>>> I don't know if it matters but I'm using unaligned checkpoints.
>>>>
>>>> On Wed, Jul 14, 2021 at 8:33 PM Dan Hill <qu...@gmail.com> wrote:
>>>>
>>>>> Here's the overview flow chart.
>>>>>
>>>>> [image: Screen Shot 2021-07-14 at 8.24.33 PM.png]
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jul 14, 2021 at 7:10 PM Dan Hill <qu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> *-others*
>>>>>>
>>>>>> *Code*
>>>>>> I'm not sure of a good, secure way of sharing the java code.  It
>>>>>> depends on multiple internal repos.  The savepoint appears to be failing in
>>>>>> a custom KeyedCoProcessFunction that joins two keyed streams in a fuzzy
>>>>>> way.  The streams are joined based on a Tuple2<String, Long> and has some
>>>>>> internal map state using String keys.
>>>>>>
>>>>>> *Flink config*
>>>>>> The most relevant parts of the flink config are the following:
>>>>>> state.backend.async: true
>>>>>> state.backend.incremental: true
>>>>>> state.backend.local-recovery: false
>>>>>> taskmanager.state.local.root-dirs: /flink_state/local-recovery
>>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num: 1
>>>>>> state.backend.rocksdb.localdir: /flink_state/rocksdb
>>>>>> state.backend.rocksdb.options-factory:
>>>>>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>>>>>> state.backend.rocksdb.predefined-options: DEFAULT
>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>>> state.backend.rocksdb.ttl.compaction.filter.enabled: false
>>>>>> state.checkpoints.dir: s3a://my-flink-state/checkpoints
>>>>>> state.savepoints.dir: s3a://my-metrics-flink-state/savepoints
>>>>>>
>>>>>> *Workflow*
>>>>>> What do you mean by workflow?
>>>>>>
>>>>>> *Logs*
>>>>>> Here's the job manager log.  The task manager log did not look useful.
>>>>>>
>>>>>> https://drive.google.com/file/d/1jC5-3Bm2OP0dX1GJACwHGeqxd4snFc-W/view?usp=sharing
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 14, 2021 at 12:45 AM Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Dan,
>>>>>>>
>>>>>>> Can you provide us with more information about your job (maybe even
>>>>>>> the job code or a minimally working example), the Flink configuration, the
>>>>>>> exact workflow you are doing and the corresponding logs and error messages?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <qu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Could this be caused by mixing of configuration settings when
>>>>>>>> running?  Running a job with one parallelism, stop/savepointing and then
>>>>>>>> recovering with a different parallelism?  I'd assume that's fine and
>>>>>>>> wouldn't put create bad state.
>>>>>>>>
>>>>>>>> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <qu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I checked my code.  Our keys for streams and map state only use
>>>>>>>>> either (1) string, (2) long IDs that don't change or (3) Tuple of 1 and 2.
>>>>>>>>>
>>>>>>>>> I don't know why my current case is breaking.  Our job partitions
>>>>>>>>> and parallelism settings have not changed.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <qu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey.  I just hit a similar error in production when trying to
>>>>>>>>>> savepoint.  We also use protobufs.
>>>>>>>>>>
>>>>>>>>>> Has anyone found a better fix to this?
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <
>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Glad to hear that you solved your problem. Afaik Flink should
>>>>>>>>>>> not read the fields of messages and call hashCode on them.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Till
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>
>>>>>>>>>>>> I found my problem. It was indeed related to a mutable hashcode.
>>>>>>>>>>>>
>>>>>>>>>>>> I was using a protobuf message in the key selector function and
>>>>>>>>>>>> one of the protobuf fields was enum. I checked the implementation of the
>>>>>>>>>>>> hashcode of the generated message and it is using the int value field of
>>>>>>>>>>>> the protobuf message so I assumed that it is ok and it's immutable.
>>>>>>>>>>>>
>>>>>>>>>>>> I replaced the key selector function to use Tuple[Long, Int]
>>>>>>>>>>>> (since my protobuf message has only these two fields where the int
>>>>>>>>>>>> parameter stands for the enum value field). After changing my code to use
>>>>>>>>>>>> the Tuple it worked.
>>>>>>>>>>>>
>>>>>>>>>>>> I am not sure if Flink somehow reads the protobuf message
>>>>>>>>>>>> fields and uses the hashcode of the fields directly since the generated
>>>>>>>>>>>> protobuf enum indeed has a mutable hashcode (Enum.hashcode).
>>>>>>>>>>>>
>>>>>>>>>>>> Nevertheless it's ok with the Tuple key.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your response!
>>>>>>>>>>>>
>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>> Rado
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <
>>>>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Rado,
>>>>>>>>>>>>>
>>>>>>>>>>>>> it is hard to tell the reason w/o a bit more details. Could
>>>>>>>>>>>>> you share with us the complete logs of the problematic run? Also the job
>>>>>>>>>>>>> you are running and the types of the state you are storing in RocksDB and
>>>>>>>>>>>>> use as events in your job are very important. In the linked SO question,
>>>>>>>>>>>>> the problem was a type whose hashcode was not immutable.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Till
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>>>>>>>>> radoslav.smilyanov@smule.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am running a Flink job that performs data enrichment. My
>>>>>>>>>>>>>> job has 7 kafka consumers that receive messages for dml statements
>>>>>>>>>>>>>> performed for 7 db tables.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Job setup:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    - Flink is run in k8s in a similar way as it is described
>>>>>>>>>>>>>>    here
>>>>>>>>>>>>>>    <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>>>>>>>>    .
>>>>>>>>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>>>>>>>>    - rocksdb as state backend
>>>>>>>>>>>>>>    - protobuf for serialization
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Whenever I try to trigger a savepoint after my state is
>>>>>>>>>>>>>> bootstrapped I get the following error for different operators:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Key group 0 is
>>>>>>>>>>>>>> not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Note: key group might vary.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article
>>>>>>>>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>>>>>>>>>>>>> similar to the one described in the article except that my job has more
>>>>>>>>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I tried to reduce the parallelism to 1 with 1 task slot per
>>>>>>>>>>>>>> task manager and this configuration seems to work. This leads me to a
>>>>>>>>>>>>>> direction that it might be some concurrency issue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would like to understand what is causing the savepoint
>>>>>>>>>>>>>> failure. Do you have any suggestions what I might be missing?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>> Rado
>>>>>>>>>>>>>>
>>>>>>>>>>>>>