You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yu Li (Jira)" <ji...@apache.org> on 2020/01/08 13:40:00 UTC

[jira] [Updated] (FLINK-14763) cep parallelism error

     [ https://issues.apache.org/jira/browse/FLINK-14763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yu Li updated FLINK-14763:
--------------------------
    Fix Version/s: 1.10.0

> cep  parallelism error 
> -----------------------
>
>                 Key: FLINK-14763
>                 URL: https://issues.apache.org/jira/browse/FLINK-14763
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.10.0
>         Environment: flink on yarn 
> flink 1.10
> hadoop 3.0
> kafka 2.2.0
>            Reporter: richt richt
>            Priority: Major
>             Fix For: 1.10.0
>
>
> when i commit a cep sql with sql-client use parallelism large than 1 , it  print error as blow
> {code:java}
> //代码占位符
> java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282) at org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at java.lang.Thread.run(Thread.java:748)
> {code}
> it seems allocate some key to the wrong taskmanager 
>  
> the yaml is 
> {code:java}
> //代码占位符
> execution:
>   planner: blink
>   type: streaming
>   parallelism: 32
> ....
> - name: Ticker
>     type: source-table
>     update-mode: append
>     connector:
>       sink-partitioner: round-robin
>       sink-partitioner-class: org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
>       property-version: 1
>       type: kafka
>       version: universal
>       topic: test_part
>       startup-mode: earliest-offset
>       properties:
>         - key: bootstrap.servers
>           value:  localhost:9092
>         - key: group.id
>           value: testGroup
>     format:
>       property-version: 1
>       type: json
>       derive-schema: true
>     schema:
>         - name: symbol
>           type: VARCHAR
>         - name: rtime
>           type: TIMESTAMP
>           rowtime:
>             timestamps:
>               type: from-field
>               from: rowtime
>             watermarks:
>               type: periodic-bounded
>               delay: 2000
>         - name: price
>           type: BIGINT
>         - name: tax
>           type: BIGINT
> {code}
> and the query is from the demo:
> {code:java}
> SELECT *
> FROM Ticker
>     MATCH_RECOGNIZE(
>         PARTITION BY symbol
>         ORDER BY rtime
>         MEASURES
>             C.price AS lastPrice
>         ONE ROW PER MATCH
>         AFTER MATCH SKIP PAST LAST ROW
>         PATTERN (A B* C)
>         DEFINE
>             A AS A.price > 10,
>             B AS B.price < 15,
>             C AS C.price > 12
>     )
> {code}
> the data :
> {code:java}
>                    symbol                     rtime                     price                       tax
>                       ACME          2011-11-11T10:00                        12                         1
>                       ACME       2011-11-11T10:00:02                        19                         1
>                       ACME       2011-11-11T10:00:01                        17                         2
>                       ACME       2011-11-11T10:00:03                        21                         3
>                       ACME       2011-11-11T10:00:04                        25                         2
>                       ACME       2011-11-11T10:00:05                        18                         1
>                       ACME       2011-11-11T10:00:06                        15                         1
>                       ACME       2011-11-11T10:00:07                        14                         2
>                       ACME       2011-11-11T10:00:08                        24                         2
>                       ACME       2011-11-11T10:00:09                        25                         2
>                       ACME       2011-11-11T10:00:10                        19                         1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)