You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "jia liu (JIRA)" <ji...@apache.org> on 2017/12/13 08:15:00 UTC
[jira] [Created] (FLINK-8248) RocksDB state backend Checkpointing
is not working with KeyedCEP in 1.4
jia liu created FLINK-8248:
------------------------------
Summary: RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
Key: FLINK-8248
URL: https://issues.apache.org/jira/browse/FLINK-8248
Project: Flink
Issue Type: Bug
Components: CEP, State Backends, Checkpointing
Affects Versions: 1.3.2, 1.4.0
Environment: linux: 3.10.0-514.el7.x86_64
flink:
* version: 1.4
* rocksdb backend state
* checkpoint interval 5s
* keyed cep
language: Java8
Reporter: jia liu
Here is my exception log
```
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
... 7 more
Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon', detector='SlidingWindowAnomalyDetector', measure='count', field='activity', dimension='Logoff', description='null', icons=null, startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0, anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0}, user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0), [SharedBufferEdge(null, 199)], 1)
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
... 13 more
```
Main job code:
```Java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new RocksDBStateBackend(getString("flink.backend-state-dir")));
// .........
DataStream<Behavior> behaviorStream = anomalyStream
.assignTimestampsAndWatermarks(new AnomalyTimestampExtractor(Time.seconds(0)))
.keyBy((KeySelector<AnomalySlice, String>) value -> value.entity)
.window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
Time.seconds(getLong("flink.window.slice-size"))))
.apply(new BehaviorBuilderFunction())
.filter(new WhitelistFilterFunction())
// non-keyed stream will result in pattern operator parallelism equal to 1.
.keyBy((KeySelector<Behavior, String>) Behavior::getUser);
// cep on behavior stream
List<Pattern> allPatterns = PatternsHolder.getAllPatterns();
for (Pattern pa : allPatterns) {
PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa);
ps.select(new AlertGenerator(pa.getName())).name(pa.getName());
}
```
keyed stream event:
```Java
public class Behavior implements Serializable {
private static final long serialVersionUID = 7786674623147772721L;
static int ANOMALY_SCORE_THRESHOLD = 40;
static int ANOMALY_COUNT_THRESHOLD = 3;
public final String schema;
public final String detector;
private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD;
public final String dimension;
public final String field; //dim value
private String user;
public String group;
public double count;
public int anomalyScore;
protected String description;
private Icon[] icons;
private int adHashCode;
private long startTimestamp;
private long endTimestamp;
private Map<Long, Double> timeMap;
public ArrayList<HashMap<String, Object>> logQuery;
public Behavior(String schema, String detector, String field, String dimension, String user,
long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
Object>> logQuery) {
this.schema = schema;
this.detector = detector;
this.field = field;
this.dimension = dimension;
this.user = user;
this.startTimestamp = fromMillis;
this.endTimestamp = toMillis;
this.count = count;
this.anomalyScore = anomalyScore;
this.logQuery = logQuery;
timeMap = new HashMap<>();
timeMap.put(fromMillis, count);
}
public Behavior(String schema, String detector, String field, String dimension,
long fromMillis, long toMillis, double count, int anomalyScore) {
this.schema = schema;
this.detector = detector;
this.field = field;
this.dimension = dimension;
this.startTimestamp = fromMillis;
this.endTimestamp = toMillis;
this.count = count;
this.anomalyScore = anomalyScore;
timeMap = new HashMap<>();
timeMap.put(fromMillis, count);
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public void setAdHashCode(int hashCode) {
this.adHashCode = hashCode;
}
public void setMeasure(String measure) {
this.measure = measure;
}
public String getMeasure() {
return measure;
}
// anomalyScore is using weighted average, may not be wise.
public void add(long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
Object>> logQuery) {
double sum = this.count * this.anomalyScore + count * anomalyScore;
this.count += count;
this.anomalyScore = (int) (sum / this.count);
if (fromMillis < this.startTimestamp) {
this.startTimestamp = fromMillis;
}
if (toMillis > this.endTimestamp) {
this.endTimestamp = toMillis;
}
if (!timeMap.containsKey(fromMillis)) {
timeMap.put(fromMillis, 0.0);
}
timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
if (logQuery != null) {
this.logQuery.addAll(logQuery);
}
}
public void add(long fromMillis, long toMillis, double count, int anomalyScore) {
double sum = this.count * this.anomalyScore + count * anomalyScore;
this.count += count;
this.anomalyScore = (int) (sum / this.count);
if (fromMillis < this.startTimestamp) {
this.startTimestamp = fromMillis;
}
if (toMillis > this.endTimestamp) {
this.endTimestamp = toMillis;
}
if (!timeMap.containsKey(fromMillis)) {
timeMap.put(fromMillis, 0.0);
}
timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
}
public Long[] getTimestamps() {
return timeMap.keySet().toArray(new Long[timeMap.size()]);
}
public String dimension() {
return dimension;
}
public long startTimestamp() {
return startTimestamp;
}
public long endTimestamp() {
return endTimestamp;
}
public double count() {
return count;
}
public int anomalyScore() {
return anomalyScore;
}
public boolean isAnomaly() {
return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >= ANOMALY_COUNT_THRESHOLD;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public void describeAs(String description, Icon... icons) {
this.description = description;
this.icons = icons;
}
public String setVisualizeInterfaceParameter(String group, long visualizeStartTimestamp, long
visualizeEndTimestamp) {
String requestParameterString = "/get_alert_visualize?detectorName=" + detector + "&groupField=" + group +
"&user=" + user + "&field=" + field + "&measureField=" + measure + "&schemaName=" + schema +
"&dimensionField=" + dimension + "&visualizeStartTimestamp=" + visualizeStartTimestamp +
"&visualizeEndTimestamp=" + visualizeEndTimestamp;
return requestParameterString;
}
@Override
public int hashCode() {
int result;
long temp;
result = schema != null ? schema.hashCode() : 0;
result = 31 * result + (detector != null ? detector.hashCode() : 0);
result = 31 * result + (measure != null ? measure.hashCode() : 0);
result = 31 * result + (field != null ? field.hashCode() : 0);
result = 31 * result + (dimension != null ? dimension.hashCode() : 0);
result = 31 * result + (description != null ? description.hashCode() : 0);
result = 31 * result + Arrays.hashCode(icons);
result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
temp = Double.doubleToLongBits(count);
result = 31 * result + (int) (temp ^ (temp >>> 32));
result = 31 * result + anomalyScore;
result = 31 * result + adHashCode;
result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0);
result = 31 * result + (user != null ? user.hashCode() : 0);
result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0);
result = 31 * result + (group != null ? group.hashCode() : 0);
return result;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Behavior behavior = (Behavior) o;
if (startTimestamp != behavior.startTimestamp) return false;
if (endTimestamp != behavior.endTimestamp) return false;
if (Double.compare(behavior.count, count) != 0) return false;
if (anomalyScore != behavior.anomalyScore) return false;
if (adHashCode != behavior.adHashCode) return false;
if (schema != null ? !schema.equals(behavior.schema) : behavior.schema != null)
return false;
if (detector != null ? !detector.equals(behavior.detector) : behavior.detector != null)
return false;
if (measure != null ? !measure.equals(behavior.measure) : behavior.measure != null)
return false;
if (field != null ? !field.equals(behavior.field) : behavior.field != null) return false;
if (dimension != null ? !dimension.equals(behavior.dimension) : behavior.dimension != null)
return false;
if (description != null ? !description.equals(behavior.description) : behavior.description != null)
return false;
// Probably incorrect - comparing Object[] arrays with Arrays.equals
if (!Arrays.equals(icons, behavior.icons)) return false;
if (timeMap != null ? !timeMap.equals(behavior.timeMap) : behavior.timeMap != null)
return false;
if (user != null ? !user.equals(behavior.user) : behavior.user != null) return false;
if (logQuery != null ? !logQuery.equals(behavior.logQuery) : behavior.logQuery != null)
return false;
return group != null ? group.equals(behavior.group) : behavior.group == null;
}
@Override
public String toString() {
return "Behavior{" +
"schema='" + schema + '\'' +
", detector='" + detector + '\'' +
", measure='" + measure + '\'' +
", field='" + field + '\'' +
", dimension='" + dimension + '\'' +
", description='" + description + '\'' +
", icons=" + Arrays.toString(icons) +
", startTimestamp=" + startTimestamp +
", endTimestamp=" + endTimestamp +
", count=" + count +
", anomalyScore=" + anomalyScore +
", adHashCode=" + adHashCode +
", timeMap=" + timeMap +
", user='" + user + '\'' +
", logQuery=" + logQuery +
", group='" + group + '\'' +
'}';
}
}
```
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)