You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/08/11 10:24:00 UTC
[jira] [Commented] (FLINK-7424) `CEP` component make `KeyedStream`
choose wrong channel
[ https://issues.apache.org/jira/browse/FLINK-7424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16123143#comment-16123143 ]
Aljoscha Krettek commented on FLINK-7424:
-----------------------------------------
The key returned from a {{KeySelector}} must be deterministic because Flink will call {{getKey()}} whenever it needs the key for a given element. It will not call {{getKey()}} only once and store the result. This is the reason for your second {{KeySelector}} not working.
Unfortunately this is not well documented but there is an issue for fixing that: https://issues.apache.org/jira/browse/FLINK-4047.
> `CEP` component make `KeyedStream` choose wrong channel
> -------------------------------------------------------
>
> Key: FLINK-7424
> URL: https://issues.apache.org/jira/browse/FLINK-7424
> Project: Flink
> Issue Type: Bug
> Components: CEP, Streaming
> Reporter: Benedict Jin
> Assignee: Benedict Jin
>
> `CEP` component make `KeyedStream` choose wrong channel
> Origin KeySelector is perfect right.
> {code:java}
> public static KeySelector<HBaseServerLog, Integer> buildKeySelector() {
> return (KeySelector<HBaseServerLog, Integer>) log -> {
> if (log == null) return 0;
> Integer flumeId;
> if ((flumeId = log.getFlumeId()) == null) return 1;
> return flumeId;
> };
> }
> {code}
> After some changes, it will throw Key group index out of range of key group range [16, 32) exception.
> {code}
> public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int parallelism) {
> return new KeySelector<HBaseServerLog, Integer>() {
> private Random r = new Random(System.nanoTime());
> @Override
> public Integer getKey(HBaseServerLog log) throws Exception {
> if (log == null) return 0;
> Integer flumeId;
> if ((flumeId = log.getFlumeId()) == null) return 1;
> return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * r.nextInt(parallelism)), 0);
> }
> };
> }
> {code}
> But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it shouldn't be wrong. Actually, when we add some `CEP` component (IterativeCondition/PatternFlatSelectFunction) code after it. It make the KeySelector choose wrong channel and throw IllegalArgumentException.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)