You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "richt richt (Jira)" <ji...@apache.org> on 2019/12/16 07:17:00 UTC
[jira] [Updated] (FLINK-14763) cep parallelism error
[ https://issues.apache.org/jira/browse/FLINK-14763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
richt richt updated FLINK-14763:
--------------------------------
Description:
when i commit a cep sql with sql-client use parallelism large than 1 , it print error as blow
{code:java}
//代码占位符
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282) at org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at java.lang.Thread.run(Thread.java:748)
{code}
it seems allocate some key to the wrong taskmanager
the yaml is
{code:java}
//代码占位符
- name: Ticker
type: source-table
update-mode: append
connector:
sink-partitioner: round-robin
sink-partitioner-class: org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
property-version: 1
type: kafka
version: universal
topic: test_part
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: 10.214.96.129:2181
- key: bootstrap.servers
value: 172.24.136.97:9092,172.24.136.98:9092,172.24.136.99:9092
- key: group.id
value: testGroup
format:
property-version: 1
type: json
derive-schema: true
schema:
- name: symbol
type: VARCHAR
- name: rtime
type: TIMESTAMP
rowtime:
timestamps:
type: from-field
from: rowtime
watermarks:
type: periodic-bounded
delay: 2000
- name: price
type: BIGINT
- name: tax
type: BIGINT
{code}
was:
when i commit a cep sql with sql-client use parallelism large than 1 , it print error as blow
{code:java}
//代码占位符
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282) at org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at java.lang.Thread.run(Thread.java:748)
{code}
it seems allocate some key to the wrong taskmanager
here the demo i use [链接标题|[https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/match_recognize.html]]
> cep parallelism error
> -----------------------
>
> Key: FLINK-14763
> URL: https://issues.apache.org/jira/browse/FLINK-14763
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.10.0
> Environment: flink on yarn
> flink 1.10
> hadoop 3.0
> kafka 2.2.0
> Reporter: richt richt
> Priority: Major
>
> when i commit a cep sql with sql-client use parallelism large than 1 , it print error as blow
> {code:java}
> //代码占位符
> java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282) at org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at java.lang.Thread.run(Thread.java:748)
> {code}
> it seems allocate some key to the wrong taskmanager
>
> the yaml is
> {code:java}
> //代码占位符
> - name: Ticker
> type: source-table
> update-mode: append
> connector:
> sink-partitioner: round-robin
> sink-partitioner-class: org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
> property-version: 1
> type: kafka
> version: universal
> topic: test_part
> startup-mode: earliest-offset
> properties:
> - key: zookeeper.connect
> value: 10.214.96.129:2181
> - key: bootstrap.servers
> value: 172.24.136.97:9092,172.24.136.98:9092,172.24.136.99:9092
> - key: group.id
> value: testGroup
> format:
> property-version: 1
> type: json
> derive-schema: true
> schema:
> - name: symbol
> type: VARCHAR
> - name: rtime
> type: TIMESTAMP
> rowtime:
> timestamps:
> type: from-field
> from: rowtime
> watermarks:
> type: periodic-bounded
> delay: 2000
> - name: price
> type: BIGINT
> - name: tax
> type: BIGINT
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)