You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Benedict Jin (JIRA)" <ji...@apache.org> on 2017/08/11 04:14:01 UTC

[jira] [Created] (FLINK-7424) `CEP` component make `KeyedStream` choose wrong channel

Benedict Jin created FLINK-7424:
-----------------------------------

             Summary: `CEP` component make `KeyedStream` choose wrong channel
                 Key: FLINK-7424
                 URL: https://issues.apache.org/jira/browse/FLINK-7424
             Project: Flink
          Issue Type: Bug
          Components: CEP, Streaming
            Reporter: Benedict Jin
            Assignee: Benedict Jin


`CEP` component make `KeyedStream` choose wrong channel

Origin KeySelector is perfect right.
{code:java}
public static KeySelector<HBaseServerLog, Integer> buildKeySelector() {
    return (KeySelector<HBaseServerLog, Integer>) log -> {
        if (log == null) return 0;
        Integer flumeId;
        if ((flumeId = log.getFlumeId()) == null) return 1;
        return flumeId;
    };
}
{code}

After some changes, it will throw {code.java}Key group index out of range of key group range [16, 32){code} exception.
{code.java}
public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int parallelism) {
    return new KeySelector<HBaseServerLog, Integer>() {
        private Random r = new Random(System.nanoTime());
        @Override
        public Integer getKey(HBaseServerLog log) throws Exception {
            if (log == null) return 0;
            Integer flumeId;
            if ((flumeId = log.getFlumeId()) == null) return 1;
            return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * r.nextInt(parallelism)), 0);
        }
    };
}
{code}

But, after {code.java}MathUtils.murmurHash(keyHash) % maxParallelism{code} process, it shouldn't be wrong. Actually, when we add some `CEP` component (IterativeCondition/PatternFlatSelectFunction) code after it. It make the {code.java}KeySelector{code}  choose wrong channel and throw IllegalArgumentException.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)