You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kostas Kloudas (JIRA)" <ji...@apache.org> on 2018/01/19 13:39:00 UTC
[jira] [Commented] (FLINK-8248) RocksDB state backend Checkpointing
is not working with KeyedCEP in 1.4
[ https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16332260#comment-16332260 ]
Kostas Kloudas commented on FLINK-8248:
---------------------------------------
Hi [~sonice_lj]! Did you manage to see if the issue that [~dawidwys] posted above is also your issue?
> RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
> -----------------------------------------------------------------------
>
> Key: FLINK-8248
> URL: https://issues.apache.org/jira/browse/FLINK-8248
> Project: Flink
> Issue Type: Bug
> Components: CEP, State Backends, Checkpointing
> Affects Versions: 1.4.0, 1.3.2
> Environment: linux: 3.10.0-514.el7.x86_64
> flink:
> * version: 1.4
> * rocksdb backend state
> * checkpoint interval 5s
> * keyed cep
> language: Java8
> Reporter: jia liu
> Priority: Major
>
> Here is my exception log:
> {code:java}
> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
> at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
> at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
> at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
> at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
> at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
> at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
> ... 7 more
> Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon', detector='SlidingWindowAnomalyDetector', measure='count', field='activity', dimension='Logoff', description='null', icons=null, startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0, anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0}, user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0), [SharedBufferEdge(null, 199)], 1)
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
> at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
> at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
> at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
> at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
> ... 13 more
> {code}
> Main job code:
> {code:java}
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new RocksDBStateBackend(getString("flink.backend-state-dir")));
> // .........
> DataStream<Behavior> behaviorStream = anomalyStream
> .assignTimestampsAndWatermarks(new AnomalyTimestampExtractor(Time.seconds(0)))
> .keyBy((KeySelector<AnomalySlice, String>) value -> value.entity)
> .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
> Time.seconds(getLong("flink.window.slice-size"))))
> .apply(new BehaviorBuilderFunction())
> .filter(new WhitelistFilterFunction())
> // non-keyed stream will result in pattern operator parallelism equal to 1.
> .keyBy((KeySelector<Behavior, String>) Behavior::getUser);
> // cep on behavior stream
> List<Pattern> allPatterns = PatternsHolder.getAllPatterns();
> for (Pattern pa : allPatterns) {
> PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa);
> ps.select(new AlertGenerator(pa.getName())).name(pa.getName());
> }
> {code}
> keyed stream event:
> {code:java}
> public class Behavior implements Serializable {
> private static final long serialVersionUID = 7786674623147772721L;
> static int ANOMALY_SCORE_THRESHOLD = 40;
> static int ANOMALY_COUNT_THRESHOLD = 3;
> public final String schema;
> public final String detector;
> private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD;
> public final String dimension;
> public final String field; //dim value
> private String user;
> public String group;
> public double count;
> public int anomalyScore;
> protected String description;
> private Icon[] icons;
> private int adHashCode;
> private long startTimestamp;
> private long endTimestamp;
> private Map<Long, Double> timeMap;
> public ArrayList<HashMap<String, Object>> logQuery;
> public Behavior(String schema, String detector, String field, String dimension, String user,
> long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
> Object>> logQuery) {
> this.schema = schema;
> this.detector = detector;
> this.field = field;
> this.dimension = dimension;
> this.user = user;
> this.startTimestamp = fromMillis;
> this.endTimestamp = toMillis;
> this.count = count;
> this.anomalyScore = anomalyScore;
> this.logQuery = logQuery;
> timeMap = new HashMap<>();
> timeMap.put(fromMillis, count);
> }
> public Behavior(String schema, String detector, String field, String dimension,
> long fromMillis, long toMillis, double count, int anomalyScore) {
> this.schema = schema;
> this.detector = detector;
> this.field = field;
> this.dimension = dimension;
> this.startTimestamp = fromMillis;
> this.endTimestamp = toMillis;
> this.count = count;
> this.anomalyScore = anomalyScore;
> timeMap = new HashMap<>();
> timeMap.put(fromMillis, count);
> }
> public String getGroup() {
> return group;
> }
> public void setGroup(String group) {
> this.group = group;
> }
> public void setAdHashCode(int hashCode) {
> this.adHashCode = hashCode;
> }
> public void setMeasure(String measure) {
> this.measure = measure;
> }
> public String getMeasure() {
> return measure;
> }
> // anomalyScore is using weighted average, may not be wise.
> public void add(long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
> Object>> logQuery) {
> double sum = this.count * this.anomalyScore + count * anomalyScore;
> this.count += count;
> this.anomalyScore = (int) (sum / this.count);
> if (fromMillis < this.startTimestamp) {
> this.startTimestamp = fromMillis;
> }
> if (toMillis > this.endTimestamp) {
> this.endTimestamp = toMillis;
> }
> if (!timeMap.containsKey(fromMillis)) {
> timeMap.put(fromMillis, 0.0);
> }
> timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
> if (logQuery != null) {
> this.logQuery.addAll(logQuery);
> }
> }
> public void add(long fromMillis, long toMillis, double count, int anomalyScore) {
> double sum = this.count * this.anomalyScore + count * anomalyScore;
> this.count += count;
> this.anomalyScore = (int) (sum / this.count);
> if (fromMillis < this.startTimestamp) {
> this.startTimestamp = fromMillis;
> }
> if (toMillis > this.endTimestamp) {
> this.endTimestamp = toMillis;
> }
> if (!timeMap.containsKey(fromMillis)) {
> timeMap.put(fromMillis, 0.0);
> }
> timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
> }
> public Long[] getTimestamps() {
> return timeMap.keySet().toArray(new Long[timeMap.size()]);
> }
> public String dimension() {
> return dimension;
> }
> public long startTimestamp() {
> return startTimestamp;
> }
> public long endTimestamp() {
> return endTimestamp;
> }
> public double count() {
> return count;
> }
> public int anomalyScore() {
> return anomalyScore;
> }
> public boolean isAnomaly() {
> return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >= ANOMALY_COUNT_THRESHOLD;
> }
> public String getUser() {
> return user;
> }
> public void setUser(String user) {
> this.user = user;
> }
> public void describeAs(String description, Icon... icons) {
> this.description = description;
> this.icons = icons;
> }
> public String setVisualizeInterfaceParameter(String group, long visualizeStartTimestamp, long
> visualizeEndTimestamp) {
> String requestParameterString = "/get_alert_visualize?detectorName=" + detector + "&groupField=" + group +
> "&user=" + user + "&field=" + field + "&measureField=" + measure + "&schemaName=" + schema +
> "&dimensionField=" + dimension + "&visualizeStartTimestamp=" + visualizeStartTimestamp +
> "&visualizeEndTimestamp=" + visualizeEndTimestamp;
> return requestParameterString;
> }
> @Override
> public int hashCode() {
> int result;
> long temp;
> result = schema != null ? schema.hashCode() : 0;
> result = 31 * result + (detector != null ? detector.hashCode() : 0);
> result = 31 * result + (measure != null ? measure.hashCode() : 0);
> result = 31 * result + (field != null ? field.hashCode() : 0);
> result = 31 * result + (dimension != null ? dimension.hashCode() : 0);
> result = 31 * result + (description != null ? description.hashCode() : 0);
> result = 31 * result + Arrays.hashCode(icons);
> result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
> result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
> temp = Double.doubleToLongBits(count);
> result = 31 * result + (int) (temp ^ (temp >>> 32));
> result = 31 * result + anomalyScore;
> result = 31 * result + adHashCode;
> result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0);
> result = 31 * result + (user != null ? user.hashCode() : 0);
> result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0);
> result = 31 * result + (group != null ? group.hashCode() : 0);
> return result;
> }
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> Behavior behavior = (Behavior) o;
> if (startTimestamp != behavior.startTimestamp) return false;
> if (endTimestamp != behavior.endTimestamp) return false;
> if (Double.compare(behavior.count, count) != 0) return false;
> if (anomalyScore != behavior.anomalyScore) return false;
> if (adHashCode != behavior.adHashCode) return false;
> if (schema != null ? !schema.equals(behavior.schema) : behavior.schema != null)
> return false;
> if (detector != null ? !detector.equals(behavior.detector) : behavior.detector != null)
> return false;
> if (measure != null ? !measure.equals(behavior.measure) : behavior.measure != null)
> return false;
> if (field != null ? !field.equals(behavior.field) : behavior.field != null) return false;
> if (dimension != null ? !dimension.equals(behavior.dimension) : behavior.dimension != null)
> return false;
> if (description != null ? !description.equals(behavior.description) : behavior.description != null)
> return false;
> // Probably incorrect - comparing Object[] arrays with Arrays.equals
> if (!Arrays.equals(icons, behavior.icons)) return false;
> if (timeMap != null ? !timeMap.equals(behavior.timeMap) : behavior.timeMap != null)
> return false;
> if (user != null ? !user.equals(behavior.user) : behavior.user != null) return false;
> if (logQuery != null ? !logQuery.equals(behavior.logQuery) : behavior.logQuery != null)
> return false;
> return group != null ? group.equals(behavior.group) : behavior.group == null;
> }
> @Override
> public String toString() {
> return "Behavior{" +
> "schema='" + schema + '\'' +
> ", detector='" + detector + '\'' +
> ", measure='" + measure + '\'' +
> ", field='" + field + '\'' +
> ", dimension='" + dimension + '\'' +
> ", description='" + description + '\'' +
> ", icons=" + Arrays.toString(icons) +
> ", startTimestamp=" + startTimestamp +
> ", endTimestamp=" + endTimestamp +
> ", count=" + count +
> ", anomalyScore=" + anomalyScore +
> ", adHashCode=" + adHashCode +
> ", timeMap=" + timeMap +
> ", user='" + user + '\'' +
> ", logQuery=" + logQuery +
> ", group='" + group + '\'' +
> '}';
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)