You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Benedict Jin (JIRA)" <ji...@apache.org> on 2017/08/11 04:14:01 UTC
[jira] [Created] (FLINK-7424) `CEP` component make `KeyedStream`
choose wrong channel
Benedict Jin created FLINK-7424:
-----------------------------------
Summary: `CEP` component make `KeyedStream` choose wrong channel
Key: FLINK-7424
URL: https://issues.apache.org/jira/browse/FLINK-7424
Project: Flink
Issue Type: Bug
Components: CEP, Streaming
Reporter: Benedict Jin
Assignee: Benedict Jin
`CEP` component make `KeyedStream` choose wrong channel
Origin KeySelector is perfect right.
{code:java}
public static KeySelector<HBaseServerLog, Integer> buildKeySelector() {
return (KeySelector<HBaseServerLog, Integer>) log -> {
if (log == null) return 0;
Integer flumeId;
if ((flumeId = log.getFlumeId()) == null) return 1;
return flumeId;
};
}
{code}
After some changes, it will throw {code.java}Key group index out of range of key group range [16, 32){code} exception.
{code.java}
public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int parallelism) {
return new KeySelector<HBaseServerLog, Integer>() {
private Random r = new Random(System.nanoTime());
@Override
public Integer getKey(HBaseServerLog log) throws Exception {
if (log == null) return 0;
Integer flumeId;
if ((flumeId = log.getFlumeId()) == null) return 1;
return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * r.nextInt(parallelism)), 0);
}
};
}
{code}
But, after {code.java}MathUtils.murmurHash(keyHash) % maxParallelism{code} process, it shouldn't be wrong. Actually, when we add some `CEP` component (IterativeCondition/PatternFlatSelectFunction) code after it. It make the {code.java}KeySelector{code} choose wrong channel and throw IllegalArgumentException.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)