You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xulang Wan (JIRA)" <ji...@apache.org> on 2019/06/26 09:13:00 UTC

[jira] [Commented] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

    [ https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873124#comment-16873124 ] 

Xulang Wan commented on FLINK-11774:
------------------------------------

I also encountered this issue bu using a string as key. The key is randomly generated between [0,1000] per element. 

Seems like the error happened because the KeyGroup is out of KeyGroupRange which doesn't make sense to me since non of them should be changed once the application has run.

> IllegalArgumentException in HeapPriorityQueueSet
> ------------------------------------------------
>
>                 Key: FLINK-11774
>                 URL: https://issues.apache.org/jira/browse/FLINK-11774
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.7.2
>         Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>            Reporter: Kirill Vainer
>            Priority: Major
>         Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>         at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>         at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>         at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>         at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
>         at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
>         at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>         at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
>         at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
>         at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
>         at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(2);
>         env.fromElements(1, 2)
>             .map(Aggregate::new)
>             .keyBy(Aggregate::getKey)
>             .timeWindow(Time.seconds(2))
>             .reduce(Aggregate::reduce)
>             .addSink(new CollectSink());
>         env.execute();
>     }
>     private static class Aggregate {
>         private Key key = new Key();
>         public Aggregate(long number) {
>         }
>         public static Aggregate reduce(Aggregate a, Aggregate b) {
>             return new Aggregate(0);
>         }
>         public Key getKey() {
>             return key;
>         }
>     }
>     public static class Key {
>     }
>     private static class CollectSink implements SinkFunction<Aggregate> {
>         private static final long serialVersionUID = 1;
>         @SuppressWarnings("rawtypes")
>         @Override
>         public void invoke(Aggregate value, Context ctx) throws Exception {
>         }
>     }
> }
> {code}
> Attached is the project that can be executed with {{./gradlew run}} showing the problem, or you can run the attached {{flink-bug-dist.zip}} which is prepackaged with the dependencies.
> Thanks in advance



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)