You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/06/08 13:45:46 UTC
flink git commit: [FLINK-6772] [cep] Fix ordering (by timestamp) of
matched events.
Repository: flink
Updated Branches:
refs/heads/master bcaf816dc -> 5d3506e88
[FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d3506e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d3506e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d3506e8
Branch: refs/heads/master
Commit: 5d3506e88f24ec0d1c2272f04570c745d319329b
Parents: bcaf816
Author: kkloudas <kk...@gmail.com>
Authored: Wed May 31 17:48:34 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jun 8 15:41:38 2017 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/cep/nfa/NFA.java | 88 +++++++++++---------
.../org/apache/flink/cep/nfa/SharedBuffer.java | 26 +++---
.../flink/cep/nfa/compiler/NFACompiler.java | 45 ++--------
.../cep/nfa/compiler/NFAStateNameHandler.java | 79 ++++++++++++++++++
.../org/apache/flink/cep/nfa/NFAITCase.java | 64 ++++++++++++++
.../apache/flink/cep/nfa/SharedBufferTest.java | 73 +++++++++-------
.../flink/cep/nfa/compiler/NFACompilerTest.java | 2 +-
7 files changed, 255 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index f438915..cac1601 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.cep.NonDuplicatingTypeSerializer;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -43,7 +44,6 @@ import org.apache.flink.util.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
-import com.google.common.collect.ListMultimap;
import javax.annotation.Nullable;
@@ -231,7 +231,7 @@ public class NFA<T> implements Serializable {
}
eventSharedBuffer.release(
- computationState.getPreviousState().getName(),
+ NFAStateNameHandler.getOriginalNameFromInternal(computationState.getPreviousState().getName()),
computationState.getEvent(),
computationState.getTimestamp(),
computationState.getCounter());
@@ -248,6 +248,7 @@ public class NFA<T> implements Serializable {
//if stop state reached in this path
boolean shouldDiscardPath = false;
for (final ComputationState<T> newComputationState: newComputationStates) {
+
if (newComputationState.isFinalState()) {
// we've reached a final state and can thus retrieve the matching event sequence
Map<String, List<T>> matchedPattern = extractCurrentMatches(newComputationState);
@@ -255,7 +256,8 @@ public class NFA<T> implements Serializable {
// remove found patterns because they are no longer needed
eventSharedBuffer.release(
- newComputationState.getPreviousState().getName(),
+ NFAStateNameHandler.getOriginalNameFromInternal(
+ newComputationState.getPreviousState().getName()),
newComputationState.getEvent(),
newComputationState.getTimestamp(),
computationState.getCounter());
@@ -263,10 +265,11 @@ public class NFA<T> implements Serializable {
//reached stop state. release entry for the stop state
shouldDiscardPath = true;
eventSharedBuffer.release(
- newComputationState.getPreviousState().getName(),
- newComputationState.getEvent(),
- newComputationState.getTimestamp(),
- computationState.getCounter());
+ NFAStateNameHandler.getOriginalNameFromInternal(
+ newComputationState.getPreviousState().getName()),
+ newComputationState.getEvent(),
+ newComputationState.getTimestamp(),
+ computationState.getCounter());
} else {
// add new computation state; it will be processed once the next event arrives
statesToRetain.add(newComputationState);
@@ -278,10 +281,11 @@ public class NFA<T> implements Serializable {
// the buffer
for (final ComputationState<T> state : statesToRetain) {
eventSharedBuffer.release(
- state.getPreviousState().getName(),
- state.getEvent(),
- state.getTimestamp(),
- state.getCounter());
+ NFAStateNameHandler.getOriginalNameFromInternal(
+ state.getPreviousState().getName()),
+ state.getEvent(),
+ state.getTimestamp(),
+ state.getCounter());
}
} else {
computationStates.addAll(statesToRetain);
@@ -473,17 +477,20 @@ public class NFA<T> implements Serializable {
if (computationState.isStartState()) {
startTimestamp = timestamp;
counter = eventSharedBuffer.put(
- currentState.getName(),
+ NFAStateNameHandler.getOriginalNameFromInternal(
+ currentState.getName()),
event,
timestamp,
currentVersion);
} else {
startTimestamp = computationState.getStartTimestamp();
counter = eventSharedBuffer.put(
- currentState.getName(),
+ NFAStateNameHandler.getOriginalNameFromInternal(
+ currentState.getName()),
event,
timestamp,
- previousState.getName(),
+ NFAStateNameHandler.getOriginalNameFromInternal(
+ previousState.getName()),
previousEvent,
computationState.getTimestamp(),
computationState.getCounter(),
@@ -530,10 +537,11 @@ public class NFA<T> implements Serializable {
if (computationState.getEvent() != null) {
// release the shared entry referenced by the current computation state.
eventSharedBuffer.release(
- computationState.getPreviousState().getName(),
- computationState.getEvent(),
- computationState.getTimestamp(),
- computationState.getCounter());
+ NFAStateNameHandler.getOriginalNameFromInternal(
+ computationState.getPreviousState().getName()),
+ computationState.getEvent(),
+ computationState.getTimestamp(),
+ computationState.getCounter());
}
return resultingComputationStates;
@@ -551,7 +559,9 @@ public class NFA<T> implements Serializable {
ComputationState<T> computationState = ComputationState.createState(
this, currentState, previousState, event, counter, timestamp, version, startTimestamp);
computationStates.add(computationState);
- eventSharedBuffer.lock(previousState.getName(), event, timestamp, counter);
+
+ String originalStateName = NFAStateNameHandler.getOriginalNameFromInternal(previousState.getName());
+ eventSharedBuffer.lock(originalStateName, event, timestamp, counter);
}
private State<T> findFinalStateAfterProceed(State<T> state, T event, ComputationState<T> computationState) {
@@ -641,32 +651,34 @@ public class NFA<T> implements Serializable {
eventSerializer = nonDuplicatingTypeSerializer.getTypeSerializer();
}
- Collection<ListMultimap<String, T>> paths = eventSharedBuffer.extractPatterns(
- computationState.getPreviousState().getName(),
+ List<Map<String, List<T>>> paths = eventSharedBuffer.extractPatterns(
+ NFAStateNameHandler.getOriginalNameFromInternal(
+ computationState.getPreviousState().getName()),
computationState.getEvent(),
computationState.getTimestamp(),
computationState.getCounter(),
computationState.getVersion());
+ if (paths.isEmpty()) {
+ return new HashMap<>();
+ }
// for a given computation state, we cannot have more than one matching patterns.
- Preconditions.checkState(paths.size() <= 1);
+ Preconditions.checkState(paths.size() == 1);
Map<String, List<T>> result = new HashMap<>();
- for (ListMultimap<String, T> path: paths) {
- for (String key: path.keySet()) {
- List<T> events = path.get(key);
-
- String originalKey = NFACompiler.getOriginalStateNameFromInternal(key);
- List<T> values = result.get(originalKey);
- if (values == null) {
- values = new ArrayList<>(events.size());
- }
+ Map<String, List<T>> path = paths.get(0);
+ for (String key: path.keySet()) {
+ List<T> events = path.get(key);
+
+ List<T> values = result.get(key);
+ if (values == null) {
+ values = new ArrayList<>(events.size());
+ result.put(key, values);
+ }
- for (T event: events) {
- // copy the element so that the user can change it
- values.add(eventSerializer.isImmutableType() ? event : eventSerializer.copy(event));
- }
- result.put(originalKey, values);
+ for (T event: events) {
+ // copy the element so that the user can change it
+ values.add(eventSerializer.isImmutableType() ? event : eventSerializer.copy(event));
}
}
return result;
@@ -871,10 +883,6 @@ public class NFA<T> implements Serializable {
return null;
}
- private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
- ois.defaultReadObject();
- }
-
@Override
public NFA<T> copy(NFA<T> from) {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index a44b333..d592c65 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -11,7 +11,7 @@
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
- * WIVHOUV WARRANVIES OR CONDIVIONS OF ANY KIND, either express or implied.
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@@ -34,8 +34,6 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
import org.apache.commons.lang3.StringUtils;
import java.io.ByteArrayInputStream;
@@ -217,14 +215,14 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
* @param version Version of the previous relation which shall be extracted
* @return Collection of previous relations starting with the given value
*/
- public Collection<ListMultimap<K, V>> extractPatterns(
+ public List<Map<K, List<V>>> extractPatterns(
final K key,
final V value,
final long timestamp,
final int counter,
final DeweyNumber version) {
- Collection<ListMultimap<K, V>> result = new ArrayList<>();
+ List<Map<K, List<V>>> result = new ArrayList<>();
// stack to remember the current extraction states
Stack<ExtractionState<K, V>> extractionStates = new Stack<>();
@@ -244,12 +242,18 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
// termination criterion
if (currentEntry == null) {
- final ListMultimap<K, V> completePath = ArrayListMultimap.create();
+ final Map<K, List<V>> completePath = new HashMap<>();
while (!currentPath.isEmpty()) {
final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
- completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue());
+ K k = currentPathEntry.getKey();
+ List<V> values = completePath.get(k);
+ if (values == null) {
+ values = new ArrayList<>();
+ completePath.put(k, values);
+ }
+ values.add(currentPathEntry.getValueTime().getValue());
}
result.add(completePath);
@@ -777,12 +781,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
ExtractionState(
final SharedBufferEntry<K, V> entry,
- final DeweyNumber version) {
- this(entry, version, null);
- }
-
- ExtractionState(
- final SharedBufferEntry<K, V> entry,
final DeweyNumber version,
final Stack<SharedBufferEntry<K, V>> path) {
this.entry = entry;
@@ -942,7 +940,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
- valueSerializer.serialize(valueTimeWrapper.value, target);
+ valueSerializer.serialize(valueTimeWrapper.getValue(), target);
target.writeLong(valueTimeWrapper.getTimestamp());
target.writeInt(valueTimeWrapper.getCounter());
http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 8d1d366..ce42acd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -32,7 +32,6 @@ import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.NotCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
@@ -44,10 +43,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
@@ -57,8 +54,6 @@ public class NFACompiler {
protected static final String ENDING_STATE_NAME = "$endState$";
- protected static final String STATE_NAME_DELIM = ":";
-
/**
* Compiles the given pattern into a {@link NFA}.
*
@@ -77,11 +72,6 @@ public class NFACompiler {
return factory.createNFA();
}
- public static String getOriginalStateNameFromInternal(String internalName) {
- Preconditions.checkNotNull(internalName);
- return internalName.split(STATE_NAME_DELIM)[0];
- }
-
/**
* Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
* multiple NFAs.
@@ -115,7 +105,7 @@ public class NFACompiler {
*/
static class NFAFactoryCompiler<T> {
- private final Set<String> usedNames = new HashSet<>();
+ private final NFAStateNameHandler stateNameHandler = new NFAStateNameHandler();
private final Map<String, State<T>> stopStates = new HashMap<>();
private final List<State<T>> states = new ArrayList<>();
@@ -207,7 +197,8 @@ public class NFACompiler {
if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
//skip notFollow patterns, they are converted into edge conditions
} else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) {
- checkPatternNameUniqueness(currentPattern.getName());
+ stateNameHandler.checkNameUniqueness(currentPattern.getName());
+
final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal);
final IterativeCondition<T> notCondition = (IterativeCondition<T>) currentPattern.getCondition();
final State<T> stopState = createStopState(notCondition, currentPattern.getName());
@@ -221,7 +212,7 @@ public class NFACompiler {
notNext.addProceed(stopState, notCondition);
lastSink = notNext;
} else {
- checkPatternNameUniqueness(currentPattern.getName());
+ stateNameHandler.checkNameUniqueness(currentPattern.getName());
lastSink = convertPattern(lastSink);
}
@@ -246,7 +237,7 @@ public class NFACompiler {
*/
@SuppressWarnings("unchecked")
private State<T> createStartState(State<T> sinkState) {
- checkPatternNameUniqueness(currentPattern.getName());
+ stateNameHandler.checkNameUniqueness(currentPattern.getName());
final State<T> beginningState = convertPattern(sinkState);
beginningState.makeStart();
return beginningState;
@@ -284,36 +275,12 @@ public class NFACompiler {
* @return the created state
*/
private State<T> createState(String name, State.StateType stateType) {
- String stateName = getUniqueInternalStateName(name);
- usedNames.add(stateName);
+ String stateName = stateNameHandler.getUniqueInternalName(name);
State<T> state = new State<>(stateName, stateType);
states.add(state);
return state;
}
- /**
- * Used to give a unique name to states created
- * during the translation process.
- *
- * @param baseName The base of the name.
- */
- private String getUniqueInternalStateName(String baseName) {
- int counter = 0;
- String candidate = baseName;
- while (usedNames.contains(candidate)) {
- candidate = baseName + STATE_NAME_DELIM + counter++;
- }
- return candidate;
- }
-
- private void checkPatternNameUniqueness(String patternName) {
- if (usedNames.contains(patternName)) {
- throw new MalformedPatternException(
- "Duplicate pattern name: " + patternName + ". " +
- "Pattern names must be unique.");
- }
- }
-
private State<T> createStopState(final IterativeCondition<T> notCondition, final String name) {
// We should not duplicate the notStates. All states from which we can stop should point to the same one.
State<T> stopState = stopStates.get(name);
http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
new file mode 100644
index 0000000..558b6f4
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.compiler;
+
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A utility class used to handle name conventions and guarantee unique
+ * names for the states of our {@link org.apache.flink.cep.nfa.NFA}.
+ */
+public class NFAStateNameHandler {
+
+ private static final String STATE_NAME_DELIM = ":";
+
+ private final Set<String> usedNames = new HashSet<>();
+
+ /**
+ * Implements the reverse process of the {@link #getUniqueInternalName(String)}.
+ *
+ * @param internalName The name to be decoded.
+ * @return The original, user-specified name for the state.
+ */
+ public static String getOriginalNameFromInternal(String internalName) {
+ Preconditions.checkNotNull(internalName);
+ return internalName.split(STATE_NAME_DELIM)[0];
+ }
+
+ /**
+ * Checks if the given name is already used or not. If yes, it
+ * throws a {@link MalformedPatternException}.
+ *
+ * @param name The name to be checked.
+ */
+ public void checkNameUniqueness(String name) {
+ if (usedNames.contains(name)) {
+ throw new MalformedPatternException("Duplicate pattern name: " + name + ". Names must be unique.");
+ }
+ }
+
+ /**
+ * Used to give a unique name to {@link org.apache.flink.cep.nfa.NFA} states
+ * created during the translation process. The name format will be
+ * {@code baseName:counter} , where the counter is increasing for states with
+ * the same {@code baseName}.
+ *
+ * @param baseName The base of the name.
+ * @return The (unique) name that is going to be used internally for the state.
+ */
+ public String getUniqueInternalName(String baseName) {
+ int counter = 0;
+ String candidate = baseName;
+ while (usedNames.contains(candidate)) {
+ candidate = baseName + STATE_NAME_DELIM + counter++;
+ }
+ usedNames.add(candidate);
+ return candidate;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 92b49d3..20cb482 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import com.google.common.collect.Lists;
+import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
@@ -2622,4 +2623,67 @@ public class NFAITCase extends TestLogger {
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end)
));
}
+
+ @Test
+ public void testNFAResultOrdering() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent1 = new Event(41, "a-1", 2.0);
+ Event startEvent2 = new Event(41, "a-2", 3.0);
+ Event startEvent3 = new Event(41, "a-3", 4.0);
+ Event startEvent4 = new Event(41, "a-4", 5.0);
+ Event endEvent1 = new Event(41, "b-1", 6.0);
+ Event endEvent2 = new Event(41, "b-2", 7.0);
+ Event endEvent3 = new Event(41, "b-3", 8.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent1, 1));
+ inputEvents.add(new StreamRecord<>(startEvent2, 3));
+ inputEvents.add(new StreamRecord<>(startEvent3, 4));
+ inputEvents.add(new StreamRecord<>(startEvent4, 5));
+ inputEvents.add(new StreamRecord<>(endEvent1, 6));
+ inputEvents.add(new StreamRecord<>(endEvent2, 7));
+ inputEvents.add(new StreamRecord<>(endEvent3, 10));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("start")
+ .where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 6452194090480345053L;
+
+ @Override
+ public boolean filter(Event s) throws Exception {
+ return s.getName().startsWith("a-");
+ }
+ }).times(4).allowCombinations()
+ .followedByAny("middle")
+ .where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -6838398439317275390L;
+
+ public boolean filter(Event s) throws Exception {
+ return s.getName().startsWith("b-");
+ }
+ }).times(3).consecutive();
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, List<Event>>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ resultingPatterns.addAll(patterns);
+ }
+
+ Assert.assertEquals(1L, resultingPatterns.size());
+
+ Map<String, List<Event>> match = resultingPatterns.get(0);
+ Assert.assertArrayEquals(
+ match.get("start").toArray(),
+ Lists.newArrayList(startEvent1, startEvent2, startEvent3, startEvent4).toArray());
+
+ Assert.assertArrayEquals(
+ match.get("middle").toArray(),
+ Lists.newArrayList(endEvent1, endEvent2, endEvent3).toArray());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index 44033c1..3621bad 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -24,15 +24,17 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.TestLogger;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -54,28 +56,43 @@ public class SharedBufferTest extends TestLogger {
events[i] = new Event(i + 1, "e" + (i + 1), i);
}
- ListMultimap<String, Event> expectedPattern1 = ArrayListMultimap.create();
- expectedPattern1.put("a1", events[2]);
- expectedPattern1.put("a[]", events[3]);
- expectedPattern1.put("b", events[5]);
-
- ListMultimap<String, Event> expectedPattern2 = ArrayListMultimap.create();
- expectedPattern2.put("a1", events[0]);
- expectedPattern2.put("a[]", events[1]);
- expectedPattern2.put("a[]", events[2]);
- expectedPattern2.put("a[]", events[3]);
- expectedPattern2.put("a[]", events[4]);
- expectedPattern2.put("b", events[5]);
-
- ListMultimap<String, Event> expectedPattern3 = ArrayListMultimap.create();
- expectedPattern3.put("a1", events[0]);
- expectedPattern3.put("a[]", events[1]);
- expectedPattern3.put("a[]", events[2]);
- expectedPattern3.put("a[]", events[3]);
- expectedPattern3.put("a[]", events[4]);
- expectedPattern3.put("a[]", events[5]);
- expectedPattern3.put("a[]", events[6]);
- expectedPattern3.put("b", events[7]);
+ Map<String, List<Event>> expectedPattern1 = new HashMap<>();
+ expectedPattern1.put("a1", new ArrayList<Event>());
+ expectedPattern1.get("a1").add(events[2]);
+
+ expectedPattern1.put("a[]", new ArrayList<Event>());
+ expectedPattern1.get("a[]").add(events[3]);
+
+ expectedPattern1.put("b", new ArrayList<Event>());
+ expectedPattern1.get("b").add(events[5]);
+
+ Map<String, List<Event>> expectedPattern2 = new HashMap<>();
+ expectedPattern2.put("a1", new ArrayList<Event>());
+ expectedPattern2.get("a1").add(events[0]);
+
+ expectedPattern2.put("a[]", new ArrayList<Event>());
+ expectedPattern2.get("a[]").add(events[1]);
+ expectedPattern2.get("a[]").add(events[2]);
+ expectedPattern2.get("a[]").add(events[3]);
+ expectedPattern2.get("a[]").add(events[4]);
+
+ expectedPattern2.put("b", new ArrayList<Event>());
+ expectedPattern2.get("b").add(events[5]);
+
+ Map<String, List<Event>> expectedPattern3 = new HashMap<>();
+ expectedPattern3.put("a1", new ArrayList<Event>());
+ expectedPattern3.get("a1").add(events[0]);
+
+ expectedPattern3.put("a[]", new ArrayList<Event>());
+ expectedPattern3.get("a[]").add(events[1]);
+ expectedPattern3.get("a[]").add(events[2]);
+ expectedPattern3.get("a[]").add(events[3]);
+ expectedPattern3.get("a[]").add(events[4]);
+ expectedPattern3.get("a[]").add(events[5]);
+ expectedPattern3.get("a[]").add(events[6]);
+
+ expectedPattern3.put("b", new ArrayList<Event>());
+ expectedPattern3.get("b").add(events[7]);
sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, DeweyNumber.fromString("1"));
sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, 0, DeweyNumber.fromString("1.0"));
@@ -90,12 +107,12 @@ public class SharedBufferTest extends TestLogger {
sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1"));
sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0"));
- Collection<ListMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
+ Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
sharedBuffer.release("b", events[7], timestamp, 7);
- Collection<ListMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
+ Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
- Collection<ListMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, 2, DeweyNumber.fromString("2.0.0"));
- Collection<ListMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, 5, DeweyNumber.fromString("1.0.0"));
+ Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, 2, DeweyNumber.fromString("2.0.0"));
+ Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, 5, DeweyNumber.fromString("1.0.0"));
sharedBuffer.release("b", events[5], timestamp, 2);
sharedBuffer.release("b", events[5], timestamp, 5);
http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index cd12071..6d4329a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -80,7 +80,7 @@ public class NFACompilerTest extends TestLogger {
// adjust the rule
expectedException.expect(MalformedPatternException.class);
- expectedException.expectMessage("Duplicate pattern name: start. Pattern names must be unique.");
+ expectedException.expectMessage("Duplicate pattern name: start. Names must be unique.");
Pattern<Event, ?> invalidPattern = Pattern.<Event>begin("start").where(new TestFilter())
.followedBy("middle").where(new TestFilter())