You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ori Popowski (Jira)" <ji...@apache.org> on 2020/07/20 07:07:00 UTC

[jira] [Comment Edited] (FLINK-18637) Key group is not in KeyGroupRange

    [ https://issues.apache.org/jira/browse/FLINK-18637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160961#comment-17160961 ] 

Ori Popowski edited comment on FLINK-18637 at 7/20/20, 7:06 AM:
----------------------------------------------------------------

[~yunta]

The error starts to happen after the job has ran some time (maybe 10h), and after that it happens every time.

The key selector is bound to the data itself i.e. it extracts session ID, user ID and another integer, and creates a string from this. The string is a {{List}} of those three, {{mkString("/")}}. In any case, it's completely deterministic. Is it possible that its length might cause a problem?

[~klion26]

The version is 1.10, updated the ticket


was (Author: oripwk):
[~yunta]

The error starts to happen after the job has ran some time (maybe 10h), and after that it happens every time.

The key selector is bound to the data itself i.e. it extracts session ID, user ID and another integer, and creates a string from this. The string is a {{List}} of those three, {{mkString("/")}}

[~klion26]

The version is 1.10, updated the ticket

> Key group is not in KeyGroupRange
> ---------------------------------
>
>                 Key: FLINK-18637
>                 URL: https://issues.apache.org/jira/browse/FLINK-18637
>             Project: Flink
>          Issue Type: Bug
>         Environment: Version: 1.10.0, Rev:<unknown>, Date:<unknown>
> OS current user: yarn
>  Current Hadoop/Kerberos user: hadoop
>  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.141-b15
>  Maximum heap size: 28960 MiBytes
>  JAVA_HOME: /usr/java/jdk1.8.0_141/jre
>  Hadoop version: 2.8.5-amzn-6
>  JVM Options:
>  -Xmx30360049728
>  -Xms30360049728
>  -XX:MaxDirectMemorySize=4429185024
>  -XX:MaxMetaspaceSize=1073741824
>  -XX:+UseG1GC
>  -XX:+UnlockDiagnosticVMOptions
>  -XX:+G1SummarizeConcMark
>  -verbose:gc
>  -XX:+PrintGCDetails
>  -XX:+PrintGCDateStamps
>  -XX:+UnlockCommercialFeatures
>  -XX:+FlightRecorder
>  -XX:+DebugNonSafepoints
>  -XX:FlightRecorderOptions=defaultrecording=true,settings=/home/hadoop/heap.jfc,dumponexit=true,dumponexitpath=/var/lib/hadoop-yarn/recording.jfr,loglevel=info
>  -Dlog.file=/var/log/hadoop-yarn/containers/application_1593935560662_0002/container_1593935560662_0002_01_000002/taskmanager.log
>  -Dlog4j.configuration=[file:./log4j.properties|file:///log4j.properties]
>  Program Arguments:
>  -Dtaskmanager.memory.framework.off-heap.size=134217728b
>  -Dtaskmanager.memory.network.max=1073741824b
>  -Dtaskmanager.memory.network.min=1073741824b
>  -Dtaskmanager.memory.framework.heap.size=134217728b
>  -Dtaskmanager.memory.managed.size=23192823744b
>  -Dtaskmanager.cpu.cores=7.0
>  -Dtaskmanager.memory.task.heap.size=30225832000b
>  -Dtaskmanager.memory.task.off-heap.size=3221225472b
>  --configDir.
>  -Djobmanager.rpc.address=ip-10-180-30-250.us-west-2.compute.internal-Dweb.port=0
>  -Dweb.tmpdir=/tmp/flink-web-64f613cf-bf04-4a09-8c14-75c31b619574
>  -Djobmanager.rpc.port=33739
>  -Drest.address=ip-10-180-30-250.us-west-2.compute.internal
>            Reporter: Ori Popowski
>            Priority: Major
>
> I'm getting this error when creating a savepoint. I've read in https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by unstable hashcode or equals on the key, or improper use of {{reinterpretAsKeyedStream}}.
>   
>  My key is a string and I don't use {{reinterpretAsKeyedStream}}.
>  
> {code:java}
> senv
>   .addSource(source)
>   .flatMap(…)
>   .filterWith { case (metadata, _, _) => … }
>   .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(…))
>   .keyingBy { case (meta, _) => meta.toPath.toString }
>   .process(new TruncateLargeSessions(config.sessionSizeLimit))
>   .keyingBy { case (meta, _) => meta.toPath.toString }
>   .window(EventTimeSessionWindows.withGap(Time.of(…)))
>   .process(new ProcessSession(sessionPlayback, config))
>   .addSink(sink){code}
>  
> {code:java}
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 962fc8e984e7ca1ed65a038aa62ce124 failed.
> 	at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
> 	at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
> 	at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> 	at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
> 	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
> 	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> 	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
> 	at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
> 	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> 	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> 	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> 	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
> 	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> 	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> 	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> 	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> 	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457)
> 	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> 	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> 	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:429)
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1466)
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1379)
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:719)
> 	at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:807)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
> 	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:428)
> 	... 11 more
> Caused by: java.lang.Exception: Could not materialize checkpoint 15 for operator KeyedProcess (11/216).
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
> 	... 3 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Key group 13880 is not in KeyGroupRange{startKeyGroup=24, endKeyGroup=26}.
> 	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 	at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
> 	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
> 	... 3 more
> Caused by: java.lang.IllegalArgumentException: Key group 13880 is not in KeyGroupRange{startKeyGroup=24, endKeyGroup=26}.
> 	at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> 	at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> 	at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:350)
> 	at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
> 	at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
> 	at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
> 	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)