You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xulang Wan (JIRA)" <ji...@apache.org> on 2019/06/27 10:56:00 UTC

[jira] [Comment Edited] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

    [ https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873124#comment-16873124 ] 

Xulang Wan edited comment on FLINK-11774 at 6/27/19 10:55 AM:
--------------------------------------------------------------

I also encountered this issue bu using a string as key. The key is randomly generated between [0,1000] per element. 

Seems like the error happened because the KeyGroup is out of KeyGroupRange which doesn't make sense to me since non of them should be changed once the application has run.

{color:#FF0000}After some investigations, i found why i met such a exception in my case:{color}

{color:#333333}In my case, the key was set by a RNG function, and in the bottom, Flink will associate this function with the stream's KeyExteactor which will be invoked whenever a key is queried per element.{color}

{color:#333333}So what actually happened is, when an element is partitioned by a key, it used a random number which is {color:#d04437}K1{color:#333333}. {color}{color}{color}

{color:#333333}{color:#d04437}{color:#333333}Later when we do some other operations related to the key (in this case we set a timer by key), {color}{color}{color}Flink will check if the key's KeyGroup(basically a hash from the key) is in the range of the operator's KeyGroup,

But since KeyExteactor will be invoked again, we may get a different random number as a key, then the Exception is encountered since an element with this new key is not supposed to be handled by this Operator instance.

{color:#333333}To this ticket's case, I strongly suspect that the KeyExtractor will also return different values for same element which caused the issue.{color}


was (Author: wanren192):
I also encountered this issue bu using a string as key. The key is randomly generated between [0,1000] per element. 

Seems like the error happened because the KeyGroup is out of KeyGroupRange which doesn't make sense to me since non of them should be changed once the application has run.

> IllegalArgumentException in HeapPriorityQueueSet
> ------------------------------------------------
>
>                 Key: FLINK-11774
>                 URL: https://issues.apache.org/jira/browse/FLINK-11774
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.7.2
>         Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>            Reporter: Kirill Vainer
>            Priority: Major
>         Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>         at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>         at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>         at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>         at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
>         at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
>         at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
>         at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
>         at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
>         at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(2);
>         env.fromElements(1, 2)
>             .map(Aggregate::new)
>             .keyBy(Aggregate::getKey)
>             .timeWindow(Time.seconds(2))
>             .reduce(Aggregate::reduce)
>             .addSink(new CollectSink());
>         env.execute();
>     }
>     private static class Aggregate {
>         private Key key = new Key();
>         public Aggregate(long number) {
>         }
>         public static Aggregate reduce(Aggregate a, Aggregate b) {
>             return new Aggregate(0);
>         }
>         public Key getKey() {
>             return key;
>         }
>     }
>     public static class Key {
>     }
>     private static class CollectSink implements SinkFunction<Aggregate> {
>         private static final long serialVersionUID = 1;
>         @SuppressWarnings("rawtypes")
>         @Override
>         public void invoke(Aggregate value, Context ctx) throws Exception {
>         }
>     }
> }
> {code}
> Attached is the project that can be executed with {{./gradlew run}} showing the problem, or you can run the attached {{flink-bug-dist.zip}} which is prepackaged with the dependencies.
> Thanks in advance



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)