You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/05/23 08:23:57 UTC
flink git commit: [FLINK-6656] [cep] Change element PriorityQueue to
MapState.
Repository: flink
Updated Branches:
refs/heads/release-1.3 30fba2b1a -> 2bd082f82
[FLINK-6656] [cep] Change element PriorityQueue to MapState.
This is to leverage the fact that RocksDB already returns the
keys sorted. So now elements, instead of being stores in a PQ
and all of them being deserialized and serialized at each incoming
element, the are stored in a MapState with the key being the
timestamp and the value, a List of elements that refer to the
same timestamp.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bd082f8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bd082f8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bd082f8
Branch: refs/heads/release-1.3
Commit: 2bd082f823eca13bf1ece01d39d279365cea7180
Parents: 30fba2b
Author: kkloudas <kk...@gmail.com>
Authored: Mon May 22 11:43:42 2017 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue May 23 10:23:41 2017 +0200
----------------------------------------------------------------------
.../AbstractKeyedCEPPatternOperator.java | 152 +++++++++++++------
.../cep/operator/CEPMigration11to13Test.java | 7 +-
.../flink/cep/operator/CEPOperatorTest.java | 19 +--
.../src/test/resources/cep-keyed-1_1-snapshot | Bin 5612 -> 5674 bytes
.../test/resources/cep-non-keyed-1.1-snapshot | Bin 3274 -> 3336 bytes
5 files changed, 118 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7b6e5e3..af4b53e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -19,6 +19,8 @@
package org.apache.flink.cep.operator;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
@@ -29,6 +31,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -54,6 +58,10 @@ import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
@@ -86,12 +94,11 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
/////////////// State //////////////
private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName";
- private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName";
+ private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
private transient ValueState<NFA<IN>> nfaOperatorState;
- private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;
+ private transient MapState<Long, List<IN>> elementQueueState;
- private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
private final NFACompiler.NFAFactory<IN> nfaFactory;
private transient InternalTimerService<VoidNamespace> timerService;
@@ -134,19 +141,13 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
new NFA.NFASerializer<>(inputSerializer)));
}
- @SuppressWarnings("unchecked,rawtypes")
- TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
- (TypeSerializer) new StreamElementSerializer<>(inputSerializer);
-
- if (priorityQueueOperatorState == null) {
- priorityQueueOperatorState = getRuntimeContext().getState(
- new ValueStateDescriptor<>(
- PRIORITY_QUEUE_STATE_NAME,
- new PriorityQueueSerializer<>(
- streamRecordSerializer,
- new PriorityQueueStreamRecordFactory<IN>()
+ if (elementQueueState == null) {
+ elementQueueState = getRuntimeContext().getMapState(
+ new MapStateDescriptor<>(
+ EVENT_QUEUE_STATE_NAME,
+ LongSerializer.INSTANCE,
+ new ListSerializer<>(inputSerializer)
)
- )
);
}
}
@@ -171,25 +172,32 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
} else {
+ long timestamp = element.getTimestamp();
+ IN value = element.getValue();
+
// In event-time processing we assume correctness of the watermark.
// Events with timestamp smaller than the last seen watermark are considered late.
// Late events are put in a dedicated side output, if the user has specified one.
- if (element.getTimestamp() >= lastWatermark) {
+ if (timestamp >= lastWatermark) {
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper watermark.
saveRegisterWatermarkTimer();
- PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+ List<IN> elementsForTimestamp = elementQueueState.get(timestamp);
+ if (elementsForTimestamp == null) {
+ elementsForTimestamp = new ArrayList<>();
+ }
+
if (getExecutionConfig().isObjectReuseEnabled()) {
// copy the StreamRecord so that it cannot be changed
- priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+ elementsForTimestamp.add(inputSerializer.copy(value));
} else {
- priorityQueue.offer(element);
+ elementsForTimestamp.add(element.getValue());
}
- updatePriorityQueue(priorityQueue);
+ elementQueueState.put(timestamp, elementsForTimestamp);
}
}
}
@@ -218,23 +226,28 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
// 5) update the last seen watermark.
// STEP 1
- PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+ PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFA<IN> nfa = getNFA();
// STEP 2
- while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= timerService.currentWatermark()) {
- StreamRecord<IN> streamRecord = priorityQueue.poll();
- processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+ while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
+ long timestamp = sortedTimestamps.poll();
+ for (IN element: elementQueueState.get(timestamp)) {
+ processEvent(nfa, element, timestamp);
+ }
+ elementQueueState.remove(timestamp);
}
// STEP 3
advanceTime(nfa, timerService.currentWatermark());
// STEP 4
- updatePriorityQueue(priorityQueue);
+ if (sortedTimestamps.isEmpty()) {
+ elementQueueState.clear();
+ }
updateNFA(nfa);
- if (!priorityQueue.isEmpty() || !nfa.isEmpty()) {
+ if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
saveRegisterWatermarkTimer();
}
@@ -264,17 +277,12 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
}
}
- private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
- PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();
- return priorityQueue != null ? priorityQueue : priorityQueueFactory.createPriorityQueue();
- }
-
- private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException {
- if (queue.isEmpty()) {
- priorityQueueOperatorState.clear();
- } else {
- priorityQueueOperatorState.update(queue);
+ private PriorityQueue<Long> getSortedTimestamps() throws Exception {
+ PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>();
+ for (Long timestamp: elementQueueState.keys()) {
+ sortedTimestamps.offer(timestamp);
}
+ return sortedTimestamps;
}
/**
@@ -318,6 +326,18 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
ValueState<NFA<IN>> oldNfaOperatorState = getRuntimeContext().getState(
new ValueStateDescriptor<>("nfaOperatorState", new NFA.Serializer<IN>()));
+ ValueState<PriorityQueue<StreamRecord<IN>>> oldPriorityQueueOperatorState =
+ getRuntimeContext().getState(
+ new ValueStateDescriptor<>(
+ "priorityQueueStateName",
+ new PriorityQueueSerializer<>(
+ ((TypeSerializer) new StreamElementSerializer<>(inputSerializer)),
+ new PriorityQueueStreamRecordFactory<IN>()
+ )
+ )
+ );
+
+
if (migratingFromOldKeyedOperator) {
int numberEntries = inputView.readInt();
for (int i = 0; i < numberEntries; i++) {
@@ -328,6 +348,30 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
NFA<IN> nfa = oldNfaOperatorState.value();
oldNfaOperatorState.clear();
nfaOperatorState.update(nfa);
+
+ PriorityQueue<StreamRecord<IN>> priorityQueue = oldPriorityQueueOperatorState.value();
+ if (priorityQueue != null && !priorityQueue.isEmpty()) {
+ Map<Long, List<IN>> elementMap = new HashMap<>();
+ for (StreamRecord<IN> record: priorityQueue) {
+ long timestamp = record.getTimestamp();
+ IN element = record.getValue();
+
+ List<IN> elements = elementMap.get(timestamp);
+ if (elements == null) {
+ elements = new ArrayList<>();
+ elementMap.put(timestamp, elements);
+ }
+ elements.add(element);
+ }
+
+ // write the old state into the new one.
+ for (Map.Entry<Long, List<IN>> entry: elementMap.entrySet()) {
+ elementQueueState.put(entry.getKey(), entry.getValue());
+ }
+
+ // clear the old state
+ oldPriorityQueueOperatorState.clear();
+ }
}
} else {
@@ -339,22 +383,35 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
// retrieve the elements that were pending in the priority queue
MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
- PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueFactory.createPriorityQueue();
+
+ Map<Long, List<IN>> elementMap = new HashMap<>();
int entries = ois.readInt();
for (int i = 0; i < entries; i++) {
StreamElement streamElement = recordSerializer.deserialize(inputView);
- priorityQueue.offer(streamElement.<IN>asRecord());
+ StreamRecord<IN> record = streamElement.<IN>asRecord();
+
+ long timestamp = record.getTimestamp();
+ IN element = record.getValue();
+
+ List<IN> elements = elementMap.get(timestamp);
+ if (elements == null) {
+ elements = new ArrayList<>();
+ elementMap.put(timestamp, elements);
+ }
+ elements.add(element);
}
// finally register the retrieved state with the new keyed state.
setCurrentKey((byte) 0);
nfaOperatorState.update(nfa);
- priorityQueueOperatorState.update(priorityQueue);
+
+ // write the priority queue to the new map state.
+ for (Map.Entry<Long, List<IN>> entry: elementMap.entrySet()) {
+ elementQueueState.put(entry.getKey(), entry.getValue());
+ }
if (!isProcessingTime) {
// this is relevant only for event/ingestion time
-
- // need to work around type restrictions
setCurrentKey((byte) 0);
saveRegisterWatermarkTimer();
}
@@ -546,15 +603,18 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
}
@VisibleForTesting
- public boolean hasNonEmptyPQ(KEY key) throws IOException {
+ public boolean hasNonEmptyPQ(KEY key) throws Exception {
setCurrentKey(key);
- return priorityQueueOperatorState.value() != null;
+ return elementQueueState.keys().iterator().hasNext();
}
@VisibleForTesting
- public int getPQSize(KEY key) throws IOException {
+ public int getPQSize(KEY key) throws Exception {
setCurrentKey(key);
- PriorityQueue<StreamRecord<IN>> pq = getPriorityQueue();
- return pq == null ? -1 : pq.size();
+ int counter = 0;
+ for (List<IN> elements: elementQueueState.values()) {
+ counter += elements.size();
+ }
+ return counter;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 255b8c2..d575e43 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -87,6 +87,9 @@ public class CEPMigration11to13Test {
harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
harness.processWatermark(new Watermark(2));
+
+ harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+
// simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot = harness.snapshot(1, 1);
FileOutputStream out = new FileOutputStream(
@@ -112,7 +115,6 @@ public class CEPMigration11to13Test {
harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
harness.open();
- harness.processElement(new StreamRecord<Event>(middleEvent, 3));
harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
harness.processElement(new StreamRecord<>(endEvent, 5));
@@ -206,6 +208,8 @@ public class CEPMigration11to13Test {
harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
harness.processWatermark(new Watermark(2));
+ harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+
// simulate snapshot/restore with empty element queue but NFA state
StreamTaskState snapshot = harness.snapshot(1, 1);
FileOutputStream out = new FileOutputStream(
@@ -233,7 +237,6 @@ public class CEPMigration11to13Test {
harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot"));
harness.open();
- harness.processElement(new StreamRecord<Event>(middleEvent, 3));
harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
harness.processElement(new StreamRecord<>(endEvent, 5));
http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 95e3a37..ab63479 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -43,7 +43,6 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.types.Either;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -140,8 +139,6 @@ public class CEPOperatorTest extends TestLogger {
}
@Test
- @Ignore
- // TODO: 5/19/17 Re-instate when checkpoints are fixed
public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {
String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
@@ -306,11 +303,11 @@ public class CEPOperatorTest extends TestLogger {
harness.processWatermark(new Watermark(Long.MIN_VALUE));
- harness.processElement(new StreamRecord<>(startEvent1, 1L));
- harness.processElement(new StreamRecord<>(startEventK2, 1L));
harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+ harness.processElement(new StreamRecord<>(startEvent1, 1L));
+ harness.processElement(new StreamRecord<>(startEventK2, 1L));
// there must be 2 keys 42, 43 registered for the watermark callback
// all the seen elements must be in the priority queues but no NFA yet.
@@ -404,13 +401,13 @@ public class CEPOperatorTest extends TestLogger {
harness.processWatermark(new Watermark(Long.MIN_VALUE));
+ harness.processElement(new StreamRecord<>(middle2Event1, 6));
+ harness.processElement(new StreamRecord<>(middle1Event3, 7));
harness.processElement(new StreamRecord<>(startEvent, 1));
harness.processElement(new StreamRecord<>(middle1Event1, 3));
- harness.processElement(new StreamRecord<>(middle1Event1, 3)); // this and the following get reordered
harness.processElement(new StreamRecord<>(middle1Event2, 3));
+ harness.processElement(new StreamRecord<>(middle1Event1, 3));
harness.processElement(new StreamRecord<>(new Event(41, "d", 6.0), 5));
- harness.processElement(new StreamRecord<>(middle2Event1, 6));
- harness.processElement(new StreamRecord<>(middle1Event3, 7));
assertEquals(1L, harness.numEventTimeTimers());
assertEquals(7L, operator.getPQSize(41));
@@ -554,8 +551,6 @@ public class CEPOperatorTest extends TestLogger {
}
@Test
- @Ignore
- // TODO: 5/19/17 Re-instate when checkpoints are fixed
public void testCEPOperatorSerializationWRocksDB() throws Exception {
String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
@@ -626,13 +621,13 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new StreamRecord<>(startEvent1, 1));
harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
harness.processWatermark(2L);
+ harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
harness.processElement(new StreamRecord<>(startEvent2, 4));
- harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
harness.processWatermark(5L);
- harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
harness.processElement(new StreamRecord<>(nextOne, 6));
harness.processElement(new StreamRecord<>(endEvent, 8));
+ harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
harness.processWatermark(100L);
List<List<Event>> resultingPatterns = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
index 277de1d..75655c6 100644
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot and b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
index b5ca51e..68ca0ec 100644
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot and b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot differ