You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "jia liu (JIRA)" <ji...@apache.org> on 2017/12/13 08:15:00 UTC

[jira] [Created] (FLINK-8248) RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4

jia liu created FLINK-8248:
------------------------------

             Summary: RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
                 Key: FLINK-8248
                 URL: https://issues.apache.org/jira/browse/FLINK-8248
             Project: Flink
          Issue Type: Bug
          Components: CEP, State Backends, Checkpointing
    Affects Versions: 1.3.2, 1.4.0
         Environment: linux: 3.10.0-514.el7.x86_64
flink: 
*  version: 1.4
*  rocksdb backend state
*  checkpoint interval 5s
*  keyed cep
language: Java8

            Reporter: jia liu


Here is my exception log
```
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
	... 7 more
Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon', detector='SlidingWindowAnomalyDetector', measure='count', field='activity', dimension='Logoff', description='null', icons=null, startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0, anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0}, user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0), [SharedBufferEdge(null, 199)], 1)
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
	... 13 more
```

Main job code:

```Java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend(getString("flink.backend-state-dir")));
// .........
DataStream<Behavior> behaviorStream = anomalyStream
                .assignTimestampsAndWatermarks(new AnomalyTimestampExtractor(Time.seconds(0)))
                .keyBy((KeySelector<AnomalySlice, String>) value -> value.entity)
                .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
                        Time.seconds(getLong("flink.window.slice-size"))))
                .apply(new BehaviorBuilderFunction())
                .filter(new WhitelistFilterFunction())
                // non-keyed stream will result in pattern operator parallelism equal to 1.
                .keyBy((KeySelector<Behavior, String>) Behavior::getUser);

        // cep on behavior stream
        List<Pattern> allPatterns = PatternsHolder.getAllPatterns();
        for (Pattern pa : allPatterns) {
            PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa);
            ps.select(new AlertGenerator(pa.getName())).name(pa.getName());
        }
```
keyed stream event:

```Java
public class Behavior implements Serializable {
    private static final long serialVersionUID = 7786674623147772721L;

    static int ANOMALY_SCORE_THRESHOLD = 40;
    static int ANOMALY_COUNT_THRESHOLD = 3;

    public final String schema;
    public final String detector;
    private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD;
    public final String dimension;
    public final String field; //dim value
    private String user;
    public String group;
    public double count;
    public int anomalyScore;
    protected String description;
    private Icon[] icons;
    private int adHashCode;
    private long startTimestamp;
    private long endTimestamp;
    private Map<Long, Double> timeMap;
    public ArrayList<HashMap<String, Object>> logQuery;

    public Behavior(String schema, String detector, String field, String dimension, String user,
                    long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
            Object>> logQuery) {
        this.schema = schema;
        this.detector = detector;
        this.field = field;
        this.dimension = dimension;
        this.user = user;
        this.startTimestamp = fromMillis;
        this.endTimestamp = toMillis;
        this.count = count;
        this.anomalyScore = anomalyScore;
        this.logQuery = logQuery;
        timeMap = new HashMap<>();
        timeMap.put(fromMillis, count);
    }

    public Behavior(String schema, String detector, String field, String dimension,
                    long fromMillis, long toMillis, double count, int anomalyScore) {
        this.schema = schema;
        this.detector = detector;
        this.field = field;
        this.dimension = dimension;
        this.startTimestamp = fromMillis;
        this.endTimestamp = toMillis;
        this.count = count;
        this.anomalyScore = anomalyScore;
        timeMap = new HashMap<>();
        timeMap.put(fromMillis, count);
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public void setAdHashCode(int hashCode) {
        this.adHashCode = hashCode;
    }

    public void setMeasure(String measure) {
        this.measure = measure;
    }

    public String getMeasure() {
        return measure;
    }

    // anomalyScore is using weighted average, may not be wise.
    public void add(long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
            Object>> logQuery) {
        double sum = this.count * this.anomalyScore + count * anomalyScore;
        this.count += count;
        this.anomalyScore = (int) (sum / this.count);

        if (fromMillis < this.startTimestamp) {
            this.startTimestamp = fromMillis;
        }
        if (toMillis > this.endTimestamp) {
            this.endTimestamp = toMillis;
        }
        if (!timeMap.containsKey(fromMillis)) {
            timeMap.put(fromMillis, 0.0);
        }
        timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
        if (logQuery != null) {
            this.logQuery.addAll(logQuery);
        }
    }

    public void add(long fromMillis, long toMillis, double count, int anomalyScore) {
        double sum = this.count * this.anomalyScore + count * anomalyScore;
        this.count += count;
        this.anomalyScore = (int) (sum / this.count);

        if (fromMillis < this.startTimestamp) {
            this.startTimestamp = fromMillis;
        }
        if (toMillis > this.endTimestamp) {
            this.endTimestamp = toMillis;
        }
        if (!timeMap.containsKey(fromMillis)) {
            timeMap.put(fromMillis, 0.0);
        }
        timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
    }

    public Long[] getTimestamps() {
        return timeMap.keySet().toArray(new Long[timeMap.size()]);
    }

    public String dimension() {
        return dimension;
    }

    public long startTimestamp() {
        return startTimestamp;
    }

    public long endTimestamp() {
        return endTimestamp;
    }

    public double count() {
        return count;
    }

    public int anomalyScore() {
        return anomalyScore;
    }

    public boolean isAnomaly() {
        return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >= ANOMALY_COUNT_THRESHOLD;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public void describeAs(String description, Icon... icons) {
        this.description = description;
        this.icons = icons;
    }

    public String setVisualizeInterfaceParameter(String group, long visualizeStartTimestamp, long
            visualizeEndTimestamp) {

        String requestParameterString = "/get_alert_visualize?detectorName=" + detector + "&groupField=" + group +
                "&user=" + user + "&field=" + field + "&measureField=" + measure + "&schemaName=" + schema +
                "&dimensionField=" + dimension + "&visualizeStartTimestamp=" + visualizeStartTimestamp +
                "&visualizeEndTimestamp=" + visualizeEndTimestamp;
        return requestParameterString;

    }

    @Override
    public int hashCode() {
        int result;
        long temp;
        result = schema != null ? schema.hashCode() : 0;
        result = 31 * result + (detector != null ? detector.hashCode() : 0);
        result = 31 * result + (measure != null ? measure.hashCode() : 0);
        result = 31 * result + (field != null ? field.hashCode() : 0);
        result = 31 * result + (dimension != null ? dimension.hashCode() : 0);
        result = 31 * result + (description != null ? description.hashCode() : 0);
        result = 31 * result + Arrays.hashCode(icons);
        result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
        result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
        temp = Double.doubleToLongBits(count);
        result = 31 * result + (int) (temp ^ (temp >>> 32));
        result = 31 * result + anomalyScore;
        result = 31 * result + adHashCode;
        result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0);
        result = 31 * result + (user != null ? user.hashCode() : 0);
        result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0);
        result = 31 * result + (group != null ? group.hashCode() : 0);
        return result;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        Behavior behavior = (Behavior) o;

        if (startTimestamp != behavior.startTimestamp) return false;
        if (endTimestamp != behavior.endTimestamp) return false;
        if (Double.compare(behavior.count, count) != 0) return false;
        if (anomalyScore != behavior.anomalyScore) return false;
        if (adHashCode != behavior.adHashCode) return false;
        if (schema != null ? !schema.equals(behavior.schema) : behavior.schema != null)
            return false;
        if (detector != null ? !detector.equals(behavior.detector) : behavior.detector != null)
            return false;
        if (measure != null ? !measure.equals(behavior.measure) : behavior.measure != null)
            return false;
        if (field != null ? !field.equals(behavior.field) : behavior.field != null) return false;
        if (dimension != null ? !dimension.equals(behavior.dimension) : behavior.dimension != null)
            return false;
        if (description != null ? !description.equals(behavior.description) : behavior.description != null)
            return false;
        // Probably incorrect - comparing Object[] arrays with Arrays.equals
        if (!Arrays.equals(icons, behavior.icons)) return false;
        if (timeMap != null ? !timeMap.equals(behavior.timeMap) : behavior.timeMap != null)
            return false;
        if (user != null ? !user.equals(behavior.user) : behavior.user != null) return false;
        if (logQuery != null ? !logQuery.equals(behavior.logQuery) : behavior.logQuery != null)
            return false;
        return group != null ? group.equals(behavior.group) : behavior.group == null;

    }

    @Override
    public String toString() {
        return "Behavior{" +
                "schema='" + schema + '\'' +
                ", detector='" + detector + '\'' +
                ", measure='" + measure + '\'' +
                ", field='" + field + '\'' +
                ", dimension='" + dimension + '\'' +
                ", description='" + description + '\'' +
                ", icons=" + Arrays.toString(icons) +
                ", startTimestamp=" + startTimestamp +
                ", endTimestamp=" + endTimestamp +
                ", count=" + count +
                ", anomalyScore=" + anomalyScore +
                ", adHashCode=" + adHashCode +
                ", timeMap=" + timeMap +
                ", user='" + user + '\'' +
                ", logQuery=" + logQuery +
                ", group='" + group + '\'' +
                '}';
    }
}
```



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)