You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/02/24 10:12:35 UTC
flink git commit: [FLINK-5845] [cep] Unify keyed and non-keyed
operators.
Repository: flink
Updated Branches:
refs/heads/master dd8ef550c -> 15ae922ad
[FLINK-5845] [cep] Unify keyed and non-keyed operators.
Now all cep operators are keyed, and for the non-keyed
usecases, we key on a dummy key and use the keyed operator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15ae922a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15ae922a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15ae922a
Branch: refs/heads/master
Commit: 15ae922ad4151701cbb4e0df207f43d0094366d1
Parents: dd8ef55
Author: kl0u <kk...@gmail.com>
Authored: Thu Feb 16 12:02:25 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Fri Feb 24 11:10:40 2017 +0100
----------------------------------------------------------------------
.../flink/api/java/functions/KeySelector.java | 2 +-
.../api/java/functions/NullByteKeySelector.java | 39 ++++
.../AbstractCEPBasePatternOperator.java | 108 ------------
.../operator/AbstractCEPPatternOperator.java | 142 ---------------
.../AbstractKeyedCEPPatternOperator.java | 128 ++++++++++----
.../flink/cep/operator/CEPOperatorUtils.java | 23 ++-
.../flink/cep/operator/CEPPatternOperator.java | 76 --------
.../cep/operator/KeyedCEPPatternOperator.java | 22 +--
.../cep/operator/TimeoutCEPPatternOperator.java | 94 ----------
.../TimeoutKeyedCEPPatternOperator.java | 43 ++---
.../flink/cep/operator/CEPOperatorTest.java | 176 -------------------
.../api/datastream/AllWindowedStream.java | 14 +-
12 files changed, 186 insertions(+), 681 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
index d96f078..da3b3e2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
*/
@Public
public interface KeySelector<IN, KEY> extends Function, Serializable {
-
+
/**
* User-defined function that extracts the key from an arbitrary object.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java b/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
new file mode 100644
index 0000000..4aa533d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Used as a dummy {@link KeySelector} to allow using keyed operators
+ * for non-keyed usecases. Essentially, it gives all incoming records
+ * the same key, which is a {@code (byte) 0} value.
+ *
+ * @param <T> The type of the input element.
+ */
+@Internal
+public class NullByteKeySelector<T> implements KeySelector<T, Byte> {
+
+ private static final long serialVersionUID = 614256539098549020L;
+
+ @Override
+ public Byte getKey(T value) throws Exception {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
deleted file mode 100644
index a3497a6..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.util.PriorityQueue;
-
-/**
- * Base class for CEP pattern operator. The operator uses a {@link NFA} to detect complex event
- * patterns. The detected event patterns are then outputted to the down stream operators.
- *
- * @param <IN> Type of the input elements
- * @param <OUT> Type fo the output elements
- */
-public abstract class AbstractCEPBasePatternOperator<IN, OUT>
- extends AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
-
- private static final long serialVersionUID = -4166778210774160757L;
-
- protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
-
- private final TypeSerializer<IN> inputSerializer;
- private final boolean isProcessingTime;
-
- public AbstractCEPBasePatternOperator(
- final TypeSerializer<IN> inputSerializer,
- final boolean isProcessingTime) {
- this.inputSerializer = inputSerializer;
- this.isProcessingTime = isProcessingTime;
- }
-
- public TypeSerializer<IN> getInputSerializer() {
- return inputSerializer;
- }
-
- protected abstract NFA<IN> getNFA() throws IOException;
-
- protected abstract void updateNFA(NFA<IN> nfa) throws IOException;
-
- protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException;
-
- protected abstract void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException;
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- if (isProcessingTime) {
- // there can be no out of order elements in processing time
- NFA<IN> nfa = getNFA();
- processEvent(nfa, element.getValue(), System.currentTimeMillis());
- updateNFA(nfa);
- } else {
- PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
-
- // event time processing
- // we have to buffer the elements until we receive the proper watermark
- if (getExecutionConfig().isObjectReuseEnabled()) {
- // copy the StreamRecord so that it cannot be changed
- priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
- } else {
- priorityQueue.offer(element);
- }
- updatePriorityQueue(priorityQueue);
- }
- }
-
- /**
- * Process the given event by giving it to the NFA and outputting the produced set of matched
- * event sequences.
- *
- * @param nfa NFA to be used for the event detection
- * @param event The current event to be processed
- * @param timestamp The timestamp of the event
- */
- protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp);
-
- /**
- * Advances the time for the given NFA to the given timestamp. This can lead to pruning and
- * timeouts.
- *
- * @param nfa to advance the time for
- * @param timestamp to advance the time to
- */
- protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
deleted file mode 100644
index fe9aced..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.PriorityQueue;
-
-/**
- * Abstract CEP pattern operator which is used for non keyed streams. Consequently,
- * the operator state only includes a single {@link NFA} and a priority queue to order out of order
- * elements in case of event time processing.
- *
- * @param <IN> Type of the input elements
- * @param <OUT> Type of the output elements
- */
-abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
- private static final long serialVersionUID = 7487334510746595640L;
-
- private final StreamElementSerializer<IN> streamRecordSerializer;
-
- // global nfa for all elements
- private NFA<IN> nfa;
-
- // queue to buffer out of order stream records
- private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
-
- public AbstractCEPPatternOperator(
- TypeSerializer<IN> inputSerializer,
- boolean isProcessingTime,
- NFACompiler.NFAFactory<IN> nfaFactory) {
- super(inputSerializer, isProcessingTime);
-
- this.streamRecordSerializer = new StreamElementSerializer<>(inputSerializer);
- this.nfa = nfaFactory.createNFA();
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- if (priorityQueue == null) {
- priorityQueue = new PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
- }
- }
-
- @Override
- protected NFA<IN> getNFA() throws IOException {
- return nfa;
- }
-
- @Override
- protected void updateNFA(NFA<IN> nfa) {
- // a no-op, because we only have one NFA
- }
-
- @Override
- protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
- return priorityQueue;
- }
-
- @Override
- protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) {
- // a no-op, because we only have one priority queue
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- // we do our own watermark handling, no super call. we will never be able to use
- // the timer service like this, however.
-
- if (priorityQueue.isEmpty()) {
- advanceTime(nfa, mark.getTimestamp());
- } else {
- while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
- StreamRecord<IN> streamRecord = priorityQueue.poll();
-
- processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
- }
- }
-
- output.emitWatermark(mark);
- }
-
- @Override
- public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
- final ObjectOutputStream oos = new ObjectOutputStream(out);
-
- oos.writeObject(nfa);
- oos.writeInt(priorityQueue.size());
-
- for (StreamRecord<IN> streamRecord: priorityQueue) {
- streamRecordSerializer.serialize(streamRecord, new DataOutputViewStreamWrapper(oos));
- }
- oos.flush();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void restoreState(FSDataInputStream state) throws Exception {
- final ObjectInputStream ois = new ObjectInputStream(state);
-
- nfa = (NFA<IN>)ois.readObject();
-
- int numberPriorityQueueEntries = ois.readInt();
-
- priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
-
- for (int i = 0; i <numberPriorityQueueEntries; i++) {
- StreamElement streamElement = streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois));
- priorityQueue.offer(streamElement.<IN>asRecord());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 832a0ba..90ee846 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -30,9 +30,13 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
@@ -44,7 +48,7 @@ import java.util.Set;
/**
* Abstract CEP pattern operator for a keyed input stream. For each key, the operator creates
* a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are
- * stored using the key value state. Additionally, the set of all seen keys is kept as part of the
+ * stored using the managed keyed state. Additionally, the set of all seen keys is kept as part of the
* operator state. This is necessary to trigger the execution for all keys upon receiving a new
* watermark.
*
@@ -52,11 +56,17 @@ import java.util.Set;
* @param <KEY> Type of the key on which the input stream is keyed
* @param <OUT> Type of the output elements
*/
-abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
- private static final long serialVersionUID = -7234999752950159178L;
+public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
+ extends AbstractStreamOperator<OUT>
+ implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
- private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState";
- private static final String PRIORIRY_QUEUE_STATE_NAME = "priorityQueueStateName";
+ private static final long serialVersionUID = -4166778210774160757L;
+
+ private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+ private final boolean isProcessingTime;
+
+ private final TypeSerializer<IN> inputSerializer;
// necessary to extract the key from the input elements
private final KeySelector<IN, KEY> keySelector;
@@ -64,29 +74,38 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
// necessary to serialize the set of seen keys
private final TypeSerializer<KEY> keySerializer;
- private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
- private final NFACompiler.NFAFactory<IN> nfaFactory;
+ /////////////// State //////////////
// stores the keys we've already seen to trigger execution upon receiving a watermark
// this can be problematic, since it is never cleared
// TODO: fix once the state refactoring is completed
private transient Set<KEY> keys;
+ private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState";
+ private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName";
+
private transient ValueState<NFA<IN>> nfaOperatorState;
private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;
- public AbstractKeyedCEPPatternOperator(
- TypeSerializer<IN> inputSerializer,
- boolean isProcessingTime,
- KeySelector<IN, KEY> keySelector,
- TypeSerializer<KEY> keySerializer,
- NFACompiler.NFAFactory<IN> nfaFactory) {
- super(inputSerializer, isProcessingTime);
+ private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
+ private final NFACompiler.NFAFactory<IN> nfaFactory;
- this.keySelector = keySelector;
- this.keySerializer = keySerializer;
+ public AbstractKeyedCEPPatternOperator(
+ final TypeSerializer<IN> inputSerializer,
+ final boolean isProcessingTime,
+ final KeySelector<IN, KEY> keySelector,
+ final TypeSerializer<KEY> keySerializer,
+ final NFACompiler.NFAFactory<IN> nfaFactory) {
+
+ this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
+ this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime);
+ this.keySelector = Preconditions.checkNotNull(keySelector);
+ this.keySerializer = Preconditions.checkNotNull(keySerializer);
+ this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+ }
- this.nfaFactory = nfaFactory;
+ public TypeSerializer<IN> getInputSerializer() {
+ return inputSerializer;
}
@Override
@@ -100,27 +119,27 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
if (nfaOperatorState == null) {
nfaOperatorState = getPartitionedState(
- new ValueStateDescriptor<NFA<IN>>(
- NFA_OPERATOR_STATE_NAME,
- new NFA.Serializer<IN>()));
+ new ValueStateDescriptor<>(NFA_OPERATOR_STATE_NAME, new NFA.Serializer<IN>()));
}
@SuppressWarnings("unchecked,rawtypes")
TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
- (TypeSerializer) new StreamElementSerializer<>(getInputSerializer());
+ (TypeSerializer) new StreamElementSerializer<>(getInputSerializer());
if (priorityQueueOperatorState == null) {
priorityQueueOperatorState = getPartitionedState(
- new ValueStateDescriptor<>(
- PRIORIRY_QUEUE_STATE_NAME,
- new PriorityQueueSerializer<>(
- streamRecordSerializer,
- new PriorityQueueStreamRecordFactory<IN>())));
+ new ValueStateDescriptor<>(
+ PRIORITY_QUEUE_STATE_NAME,
+ new PriorityQueueSerializer<>(
+ streamRecordSerializer,
+ new PriorityQueueStreamRecordFactory<IN>()
+ )
+ )
+ );
}
}
- @Override
- protected NFA<IN> getNFA() throws IOException {
+ private NFA<IN> getNFA() throws IOException {
NFA<IN> nfa = nfaOperatorState.value();
if (nfa == null) {
@@ -132,13 +151,11 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
return nfa;
}
- @Override
- protected void updateNFA(NFA<IN> nfa) throws IOException {
+ private void updateNFA(NFA<IN> nfa) throws IOException {
nfaOperatorState.update(nfa);
}
- @Override
- protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
+ private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();
if (priorityQueue == null) {
@@ -150,8 +167,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
return priorityQueue;
}
- @Override
- protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException {
+ private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException {
priorityQueueOperatorState.update(queue);
}
@@ -159,7 +175,24 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
public void processElement(StreamRecord<IN> element) throws Exception {
keys.add(keySelector.getKey(element.getValue()));
- super.processElement(element);
+ if (isProcessingTime) {
+ // there can be no out of order elements in processing time
+ NFA<IN> nfa = getNFA();
+ processEvent(nfa, element.getValue(), System.currentTimeMillis());
+ updateNFA(nfa);
+ } else {
+ PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+
+ // event time processing
+ // we have to buffer the elements until we receive the proper watermark
+ if (getExecutionConfig().isObjectReuseEnabled()) {
+ // copy the StreamRecord so that it cannot be changed
+ priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+ } else {
+ priorityQueue.offer(element);
+ }
+ updatePriorityQueue(priorityQueue);
+ }
}
@Override
@@ -175,7 +208,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
NFA<IN> nfa = getNFA();
if (priorityQueue.isEmpty()) {
- advanceTime(nfa, mark.getTimestamp());
+ advanceTime(nfa, mark.getTimestamp());
} else {
while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
StreamRecord<IN> streamRecord = priorityQueue.poll();
@@ -191,6 +224,25 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
output.emitWatermark(mark);
}
+ /**
+ * Process the given event by giving it to the NFA and outputting the produced set of matched
+ * event sequences.
+ *
+ * @param nfa NFA to be used for the event detection
+ * @param event The current event to be processed
+ * @param timestamp The timestamp of the event
+ */
+ protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp);
+
+ /**
+ * Advances the time for the given NFA to the given timestamp. This can lead to pruning and
+ * timeouts.
+ *
+ * @param nfa to advance the time for
+ * @param timestamp to advance the time to
+ */
+ protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
+
@Override
public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
DataOutputView ov = new DataOutputViewStreamWrapper(out);
@@ -216,6 +268,8 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
}
}
+ ////////////////////// Utility Classes //////////////////////
+
/**
* Custom type serializer implementation to serialize priority queues.
*
@@ -228,7 +282,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
private final TypeSerializer<T> elementSerializer;
private final PriorityQueueFactory<T> factory;
- public PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) {
+ PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) {
this.elementSerializer = elementSerializer;
this.factory = factory;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 39e2ccd..36f2e7a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -21,7 +21,9 @@ package org.apache.flink.cep.operator;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -72,12 +74,18 @@ public class CEPOperatorUtils {
keySerializer,
nfaFactory));
} else {
- patternStream = inputStream.transform(
+
+ KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
+ TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE;
+
+ patternStream = inputStream.keyBy(keySelector).transform(
"CEPPatternOperator",
(TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
- new CEPPatternOperator<>(
+ new KeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
+ keySelector,
+ keySerializer,
nfaFactory
)).forceNonParallel();
}
@@ -127,12 +135,19 @@ public class CEPOperatorUtils {
keySerializer,
nfaFactory));
} else {
- patternStream = inputStream.transform(
+
+ KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
+
+ TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE;
+
+ patternStream = inputStream.keyBy(keySelector).transform(
"TimeoutCEPPatternOperator",
eitherTypeInformation,
- new TimeoutCEPPatternOperator<>(
+ new TimeoutKeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
+ keySelector,
+ keySerializer,
nfaFactory
)).forceNonParallel();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
deleted file mode 100644
index 57f27c2..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * CEP pattern operator which only returns fully matched event patterns stored in a {@link Map}. The
- * events are indexed by the event names associated in the pattern specification.
- *
- * @param <IN> Type of the input events
- */
-public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Map<String, IN>> {
- private static final long serialVersionUID = 376300194236250645L;
-
- public CEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, NFACompiler.NFAFactory<IN> nfaFactory) {
- super(inputSerializer, isProcessingTime, nfaFactory);
- }
-
- @Override
- protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
- Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
- event,
- timestamp);
-
- Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-
- emitMatchedSequences(matchedPatterns, timestamp);
- }
-
- @Override
- protected void advanceTime(NFA<IN> nfa, long timestamp) {
- Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
-
- emitMatchedSequences(patterns.f0, timestamp);
- }
-
- private void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
- Iterator<Map<String, IN>> iterator = matchedSequences.iterator();
-
- if (iterator.hasNext()) {
- StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
- null,
- timestamp);
-
- do {
- streamRecord.replace(iterator.next());
- output.collect(streamRecord);
- } while (iterator.hasNext());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 4d8a907..5b6ffe2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -40,25 +40,27 @@ import java.util.Map;
public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, IN>> {
private static final long serialVersionUID = 5328573789532074581L;
- public KeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory) {
+ public KeyedCEPPatternOperator(
+ TypeSerializer<IN> inputSerializer,
+ boolean isProcessingTime,
+ KeySelector<IN, KEY> keySelector,
+ TypeSerializer<KEY> keySerializer,
+ NFACompiler.NFAFactory<IN> nfaFactory) {
+
super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
}
@Override
protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
- Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
- event,
- timestamp);
-
- Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-
- emitMatchedSequences(matchedPatterns, timestamp);
+ Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+ nfa.process(event, timestamp);
+ emitMatchedSequences(patterns.f0, timestamp);
}
@Override
protected void advanceTime(NFA<IN> nfa, long timestamp) {
- Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
-
+ Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+ nfa.process(null, timestamp);
emitMatchedSequences(patterns.f0, timestamp);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
deleted file mode 100644
index 9a04468..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.types.Either;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * CEP pattern operator which only returns fully matched event patterns and partially matched event
- * patterns which have timed out wrapped in {@link Either}. The matched events are stored in a
- * {@link Map} and are indexed by the event names associated in the pattern specification.
- *
- * The fully matched event patterns are returned as a {@link Either.Right} instance and the
- * partially matched event patterns are returned as a {@link Either.Left} instance.
- *
- * @param <IN> Type of the input events
- */
-public class TimeoutCEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> {
- private static final long serialVersionUID = -3911002597290988201L;
-
- public TimeoutCEPPatternOperator(
- TypeSerializer<IN> inputSerializer,
- boolean isProcessingTime,
- NFACompiler.NFAFactory<IN> nfaFactory) {
-
- super(inputSerializer, isProcessingTime, nfaFactory);
- }
-
- @Override
- protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
- Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
- event,
- timestamp);
-
- Collection<Map<String, IN>> matchedPatterns = patterns.f0;
- Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
-
- emitMatchedSequences(matchedPatterns, timestamp);
- emitTimedOutSequences(partialPatterns, timestamp);
- }
-
- @Override
- protected void advanceTime(NFA<IN> nfa, long timestamp) {
- Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
-
- emitMatchedSequences(patterns.f0, timestamp);
- emitTimedOutSequences(patterns.f1, timestamp);
- }
-
- private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) {
- StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
- null,
- timestamp);
-
- for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
- streamRecord.replace(Either.Left(partialPattern));
- output.collect(streamRecord);
- }
- }
-
- protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
- StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
- null,
- timestamp);
-
- for (Map<String, IN> matchedPattern : matchedSequences) {
- streamRecord.replace(Either.Right(matchedPattern));
- output.collect(streamRecord);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 4d33435..6889bb9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -29,44 +29,48 @@ import org.apache.flink.types.Either;
import java.util.Collection;
import java.util.Map;
+/**
+ * CEP pattern operator which returns fully and partially matched (timed-out) event patterns stored in a
+ * {@link Map}. The events are indexed by the event names associated in the pattern specification. The
+ * operator works on keyed input data.
+ *
+ * @param <IN> Type of the input events
+ * @param <KEY> Type of the key
+ */
public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> {
private static final long serialVersionUID = 3570542177814518158L;
public TimeoutKeyedCEPPatternOperator(
- TypeSerializer<IN> inputSerializer,
- boolean isProcessingTime,
- KeySelector<IN, KEY> keySelector,
- TypeSerializer<KEY> keySerializer,
- NFACompiler.NFAFactory<IN> nfaFactory) {
+ TypeSerializer<IN> inputSerializer,
+ boolean isProcessingTime,
+ KeySelector<IN, KEY> keySelector,
+ TypeSerializer<KEY> keySerializer,
+ NFACompiler.NFAFactory<IN> nfaFactory) {
super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
}
@Override
protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
- Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
- event,
- timestamp);
-
- Collection<Map<String, IN>> matchedSequences = patterns.f0;
- Collection<Tuple2<Map<String, IN>, Long>> timedOutSequences = patterns.f1;
+ Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+ nfa.process(event, timestamp);
- emitMatchedSequences(matchedSequences, timestamp);
- emitTimedOutSequences(timedOutSequences, timestamp);
+ emitMatchedSequences(patterns.f0, timestamp);
+ emitTimedOutSequences(patterns.f1, timestamp);
}
@Override
protected void advanceTime(NFA<IN> nfa, long timestamp) {
- Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+ Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+ nfa.process(null, timestamp);
emitMatchedSequences(patterns.f0, timestamp);
emitTimedOutSequences(patterns.f1, timestamp);
}
private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) {
- StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
- null,
- timestamp);
+ StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord =
+ new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(null, timestamp);
for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
streamRecord.replace(Either.Left(partialPattern));
@@ -75,9 +79,8 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
}
protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
- StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
- null,
- timestamp);
+ StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord =
+ new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(null, timestamp);
for (Map<String, IN> matchedPattern : matchedSequences) {
streamRecord.replace(Either.Right(matchedPattern));
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index db17f6d..f90b670 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -57,29 +57,6 @@ public class CEPOperatorTest extends TestLogger {
public TemporaryFolder tempFolder = new TemporaryFolder();
@Test
- public void testCEPOperatorWatermarkForwarding() throws Exception {
- OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
- new CEPPatternOperator<>(
- Event.createTypeSerializer(),
- false,
- new NFAFactory())
- );
-
- harness.open();
-
- Watermark expectedWatermark = new Watermark(42L);
-
- harness.processWatermark(expectedWatermark);
-
- Object watermark = harness.getOutput().poll();
-
- assertTrue(watermark instanceof Watermark);
- assertEquals(expectedWatermark, watermark);
-
- harness.close();
- }
-
- @Test
public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
private static final long serialVersionUID = -4873366487571254798L;
@@ -115,94 +92,6 @@ public class CEPOperatorTest extends TestLogger {
}
@Test
- public void testCEPOperatorCheckpointing() throws Exception {
- KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
- private static final long serialVersionUID = -4873366487571254798L;
-
- @Override
- public Integer getKey(Event value) throws Exception {
- return value.getId();
- }
- };
-
- OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
- new CEPPatternOperator<>(
- Event.createTypeSerializer(),
- false,
- new NFAFactory()));
-
- harness.open();
-
- Event startEvent = new Event(42, "start", 1.0);
- SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
- Event endEvent= new Event(42, "end", 1.0);
-
- harness.processElement(new StreamRecord<Event>(startEvent, 1));
- harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
-
- // simulate snapshot/restore with some elements in internal sorting queue
- StreamStateHandle snapshot = harness.snapshotLegacy(0, 0);
- harness.close();
-
- harness = new OneInputStreamOperatorTestHarness<>(
- new CEPPatternOperator<>(
- Event.createTypeSerializer(),
- false,
- new NFAFactory()));
-
- harness.setup();
- harness.restore(snapshot);
- harness.open();
-
- harness.processWatermark(new Watermark(Long.MIN_VALUE));
-
- harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
-
- // if element timestamps are not correctly checkpointed/restored this will lead to
- // a pruning time underflow exception in NFA
- harness.processWatermark(new Watermark(2));
-
- // simulate snapshot/restore with empty element queue but NFA state
- StreamStateHandle snapshot2 = harness.snapshotLegacy(1, 1);
- harness.close();
-
- harness = new OneInputStreamOperatorTestHarness<>(
- new CEPPatternOperator<>(
- Event.createTypeSerializer(),
- false,
- new NFAFactory()));
-
- harness.setup();
- harness.restore(snapshot2);
- harness.open();
-
- harness.processElement(new StreamRecord<Event>(middleEvent, 3));
- harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4));
- harness.processElement(new StreamRecord<Event>(endEvent, 5));
-
- harness.processWatermark(new Watermark(Long.MAX_VALUE));
-
- ConcurrentLinkedQueue<Object> result = harness.getOutput();
-
- // watermark and the result
- assertEquals(2, result.size());
-
- Object resultObject = result.poll();
- assertTrue(resultObject instanceof StreamRecord);
- StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
- assertTrue(resultRecord.getValue() instanceof Map);
-
- @SuppressWarnings("unchecked")
- Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
-
- assertEquals(startEvent, patternMap.get("start"));
- assertEquals(middleEvent, patternMap.get("middle"));
- assertEquals(endEvent, patternMap.get("end"));
-
- harness.close();
- }
-
- @Test
public void testKeyedCEPOperatorCheckpointing() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
@@ -493,71 +382,6 @@ public class CEPOperatorTest extends TestLogger {
}
}
- /**
- * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
- */
- @Test
- public void testAdvancingTimeWithoutElements() throws Exception {
- final Event startEvent = new Event(42, "start", 1.0);
- final long watermarkTimestamp1 = 5L;
- final long watermarkTimestamp2 = 13L;
-
- final Map<String, Event> expectedSequence = new HashMap<>(2);
- expectedSequence.put("start", startEvent);
-
- OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new OneInputStreamOperatorTestHarness<>(
- new TimeoutCEPPatternOperator<>(
- Event.createTypeSerializer(),
- false,
- new NFAFactory(true))
- );
-
- try {
- harness.setup(
- new KryoSerializer<>(
- (Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>) (Object) Either.class,
- new ExecutionConfig()));
- harness.open();
-
- harness.processElement(new StreamRecord<>(startEvent, 3L));
- harness.processWatermark(new Watermark(watermarkTimestamp1));
- harness.processWatermark(new Watermark(watermarkTimestamp2));
-
- Queue<Object> result = harness.getOutput();
-
- assertEquals(3, result.size());
-
- Object watermark1 = result.poll();
-
- assertTrue(watermark1 instanceof Watermark);
-
- assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp());
-
- Object resultObject = result.poll();
-
- assertTrue(resultObject instanceof StreamRecord);
-
- StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) resultObject;
-
- assertTrue(streamRecord.getValue() instanceof Either.Left);
-
- Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>) streamRecord.getValue();
-
- Tuple2<Map<String, Event>, Long> leftResult = left.left();
-
- assertEquals(watermarkTimestamp2, (long) leftResult.f1);
- assertEquals(expectedSequence, leftResult.f0);
-
- Object watermark2 = result.poll();
-
- assertTrue(watermark2 instanceof Watermark);
-
- assertEquals(watermarkTimestamp2, ((Watermark) watermark2).getTimestamp());
- } finally {
- harness.close();
- }
- }
-
private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
private static final long serialVersionUID = 1173020762472766713L;
http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 4f4546e..f883ef5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -1124,17 +1125,4 @@ public class AllWindowedStream<T, W extends Window> {
public TypeInformation<T> getInputType() {
return input.getType();
}
-
- /**
- * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows.
- * @param <T>
- */
- private static class NullByteKeySelector<T> implements KeySelector<T, Byte> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Byte getKey(T value) throws Exception {
- return 0;
- }
- }
}