You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/05/23 08:23:57 UTC

flink git commit: [FLINK-6656] [cep] Change element PriorityQueue to MapState.

Repository: flink
Updated Branches:
  refs/heads/release-1.3 30fba2b1a -> 2bd082f82


[FLINK-6656] [cep] Change element PriorityQueue to MapState.

This is to leverage the fact that RocksDB already returns the
keys sorted. So now elements, instead of being stores in a PQ
and all of them being deserialized and serialized at each incoming
element, the are stored in a MapState with the key being the
timestamp and the value, a List of elements that refer to the
same timestamp.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bd082f8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bd082f8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bd082f8

Branch: refs/heads/release-1.3
Commit: 2bd082f823eca13bf1ece01d39d279365cea7180
Parents: 30fba2b
Author: kkloudas <kk...@gmail.com>
Authored: Mon May 22 11:43:42 2017 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue May 23 10:23:41 2017 +0200

----------------------------------------------------------------------
 .../AbstractKeyedCEPPatternOperator.java        | 152 +++++++++++++------
 .../cep/operator/CEPMigration11to13Test.java    |   7 +-
 .../flink/cep/operator/CEPOperatorTest.java     |  19 +--
 .../src/test/resources/cep-keyed-1_1-snapshot   | Bin 5612 -> 5674 bytes
 .../test/resources/cep-non-keyed-1.1-snapshot   | Bin 3274 -> 3336 bytes
 5 files changed, 118 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7b6e5e3..af4b53e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -19,6 +19,8 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
@@ -29,6 +31,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -54,6 +58,10 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
 
@@ -86,12 +94,11 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 	///////////////			State			//////////////
 
 	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName";
-	private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName";
+	private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
 
 	private transient ValueState<NFA<IN>> nfaOperatorState;
-	private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;
+	private transient MapState<Long, List<IN>> elementQueueState;
 
-	private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
 	private final NFACompiler.NFAFactory<IN> nfaFactory;
 
 	private transient InternalTimerService<VoidNamespace> timerService;
@@ -134,19 +141,13 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 						new NFA.NFASerializer<>(inputSerializer)));
 		}
 
-		@SuppressWarnings("unchecked,rawtypes")
-		TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
-			(TypeSerializer) new StreamElementSerializer<>(inputSerializer);
-
-		if (priorityQueueOperatorState == null) {
-			priorityQueueOperatorState = getRuntimeContext().getState(
-				new ValueStateDescriptor<>(
-					PRIORITY_QUEUE_STATE_NAME,
-					new PriorityQueueSerializer<>(
-						streamRecordSerializer,
-						new PriorityQueueStreamRecordFactory<IN>()
+		if (elementQueueState == null) {
+			elementQueueState = getRuntimeContext().getMapState(
+					new MapStateDescriptor<>(
+							EVENT_QUEUE_STATE_NAME,
+							LongSerializer.INSTANCE,
+							new ListSerializer<>(inputSerializer)
 					)
-				)
 			);
 		}
 	}
@@ -171,25 +172,32 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 
 		} else {
 
+			long timestamp = element.getTimestamp();
+			IN value = element.getValue();
+
 			// In event-time processing we assume correctness of the watermark.
 			// Events with timestamp smaller than the last seen watermark are considered late.
 			// Late events are put in a dedicated side output, if the user has specified one.
 
-			if (element.getTimestamp() >= lastWatermark) {
+			if (timestamp >= lastWatermark) {
 
 				// we have an event with a valid timestamp, so
 				// we buffer it until we receive the proper watermark.
 
 				saveRegisterWatermarkTimer();
 
-				PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+				List<IN> elementsForTimestamp =  elementQueueState.get(timestamp);
+				if (elementsForTimestamp == null) {
+					elementsForTimestamp = new ArrayList<>();
+				}
+
 				if (getExecutionConfig().isObjectReuseEnabled()) {
 					// copy the StreamRecord so that it cannot be changed
-					priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+					elementsForTimestamp.add(inputSerializer.copy(value));
 				} else {
-					priorityQueue.offer(element);
+					elementsForTimestamp.add(element.getValue());
 				}
-				updatePriorityQueue(priorityQueue);
+				elementQueueState.put(timestamp, elementsForTimestamp);
 			}
 		}
 	}
@@ -218,23 +226,28 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		// 5) update the last seen watermark.
 
 		// STEP 1
-		PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+		PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
 		NFA<IN> nfa = getNFA();
 
 		// STEP 2
-		while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= timerService.currentWatermark()) {
-			StreamRecord<IN> streamRecord = priorityQueue.poll();
-			processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+		while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
+			long timestamp = sortedTimestamps.poll();
+			for (IN element: elementQueueState.get(timestamp)) {
+				processEvent(nfa, element, timestamp);
+			}
+			elementQueueState.remove(timestamp);
 		}
 
 		// STEP 3
 		advanceTime(nfa, timerService.currentWatermark());
 
 		// STEP 4
-		updatePriorityQueue(priorityQueue);
+		if (sortedTimestamps.isEmpty()) {
+			elementQueueState.clear();
+		}
 		updateNFA(nfa);
 
-		if (!priorityQueue.isEmpty() || !nfa.isEmpty()) {
+		if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
 			saveRegisterWatermarkTimer();
 		}
 
@@ -264,17 +277,12 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		}
 	}
 
-	private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
-		PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();
-		return priorityQueue != null ? priorityQueue : priorityQueueFactory.createPriorityQueue();
-	}
-
-	private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException {
-		if (queue.isEmpty()) {
-			priorityQueueOperatorState.clear();
-		} else {
-			priorityQueueOperatorState.update(queue);
+	private PriorityQueue<Long> getSortedTimestamps() throws Exception {
+		PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>();
+		for (Long timestamp: elementQueueState.keys()) {
+			sortedTimestamps.offer(timestamp);
 		}
+		return sortedTimestamps;
 	}
 
 	/**
@@ -318,6 +326,18 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		ValueState<NFA<IN>> oldNfaOperatorState = getRuntimeContext().getState(
 				new ValueStateDescriptor<>("nfaOperatorState", new NFA.Serializer<IN>()));
 
+		ValueState<PriorityQueue<StreamRecord<IN>>> oldPriorityQueueOperatorState =
+				getRuntimeContext().getState(
+					new ValueStateDescriptor<>(
+							"priorityQueueStateName",
+							new PriorityQueueSerializer<>(
+									((TypeSerializer) new StreamElementSerializer<>(inputSerializer)),
+									new PriorityQueueStreamRecordFactory<IN>()
+							)
+					)
+			);
+
+
 		if (migratingFromOldKeyedOperator) {
 			int numberEntries = inputView.readInt();
 			for (int i = 0; i < numberEntries; i++) {
@@ -328,6 +348,30 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 				NFA<IN> nfa = oldNfaOperatorState.value();
 				oldNfaOperatorState.clear();
 				nfaOperatorState.update(nfa);
+
+				PriorityQueue<StreamRecord<IN>> priorityQueue = oldPriorityQueueOperatorState.value();
+				if (priorityQueue != null && !priorityQueue.isEmpty()) {
+					Map<Long, List<IN>> elementMap = new HashMap<>();
+					for (StreamRecord<IN> record: priorityQueue) {
+						long timestamp = record.getTimestamp();
+						IN element = record.getValue();
+
+						List<IN> elements = elementMap.get(timestamp);
+						if (elements == null) {
+							elements = new ArrayList<>();
+							elementMap.put(timestamp, elements);
+						}
+						elements.add(element);
+					}
+
+					// write the old state into the new one.
+					for (Map.Entry<Long, List<IN>> entry: elementMap.entrySet()) {
+						elementQueueState.put(entry.getKey(), entry.getValue());
+					}
+
+					// clear the old state
+					oldPriorityQueueOperatorState.clear();
+				}
 			}
 		} else {
 
@@ -339,22 +383,35 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 
 			// retrieve the elements that were pending in the priority queue
 			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
-			PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueFactory.createPriorityQueue();
+
+			Map<Long, List<IN>> elementMap = new HashMap<>();
 			int entries = ois.readInt();
 			for (int i = 0; i < entries; i++) {
 				StreamElement streamElement = recordSerializer.deserialize(inputView);
-				priorityQueue.offer(streamElement.<IN>asRecord());
+				StreamRecord<IN> record = streamElement.<IN>asRecord();
+
+				long timestamp = record.getTimestamp();
+				IN element = record.getValue();
+
+				List<IN> elements = elementMap.get(timestamp);
+				if (elements == null) {
+					elements = new ArrayList<>();
+					elementMap.put(timestamp, elements);
+				}
+				elements.add(element);
 			}
 
 			// finally register the retrieved state with the new keyed state.
 			setCurrentKey((byte) 0);
 			nfaOperatorState.update(nfa);
-			priorityQueueOperatorState.update(priorityQueue);
+
+			// write the priority queue to the new map state.
+			for (Map.Entry<Long, List<IN>> entry: elementMap.entrySet()) {
+				elementQueueState.put(entry.getKey(), entry.getValue());
+			}
 
 			if (!isProcessingTime) {
 				// this is relevant only for event/ingestion time
-
-				// need to work around type restrictions
 				setCurrentKey((byte) 0);
 				saveRegisterWatermarkTimer();
 			}
@@ -546,15 +603,18 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 	}
 
 	@VisibleForTesting
-	public boolean hasNonEmptyPQ(KEY key) throws IOException {
+	public boolean hasNonEmptyPQ(KEY key) throws Exception {
 		setCurrentKey(key);
-		return priorityQueueOperatorState.value() != null;
+		return elementQueueState.keys().iterator().hasNext();
 	}
 
 	@VisibleForTesting
-	public int getPQSize(KEY key) throws IOException {
+	public int getPQSize(KEY key) throws Exception {
 		setCurrentKey(key);
-		PriorityQueue<StreamRecord<IN>> pq = getPriorityQueue();
-		return pq == null ? -1 : pq.size();
+		int counter = 0;
+		for (List<IN> elements: elementQueueState.values()) {
+			counter += elements.size();
+		}
+		return counter;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 255b8c2..d575e43 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -87,6 +87,9 @@ public class CEPMigration11to13Test {
 		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
 		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
 		harness.processWatermark(new Watermark(2));
+
+		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+
 		// simulate snapshot/restore with empty element queue but NFA state
 		StreamTaskState snapshot = harness.snapshot(1, 1);
 		FileOutputStream out = new FileOutputStream(
@@ -112,7 +115,6 @@ public class CEPMigration11to13Test {
 		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
 		harness.open();
 
-		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
 		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
 		harness.processElement(new StreamRecord<>(endEvent, 5));
 
@@ -206,6 +208,8 @@ public class CEPMigration11to13Test {
 		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
 		harness.processWatermark(new Watermark(2));
 
+		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+
 		// simulate snapshot/restore with empty element queue but NFA state
 		StreamTaskState snapshot = harness.snapshot(1, 1);
 		FileOutputStream out = new FileOutputStream(
@@ -233,7 +237,6 @@ public class CEPMigration11to13Test {
 		harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot"));
 		harness.open();
 
-		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
 		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
 		harness.processElement(new StreamRecord<>(endEvent, 5));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 95e3a37..ab63479 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -43,7 +43,6 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -140,8 +139,6 @@ public class CEPOperatorTest extends TestLogger {
 	}
 
 	@Test
-	@Ignore
-	// TODO: 5/19/17 Re-instate when checkpoints are fixed
 	public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {
 
 		String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
@@ -306,11 +303,11 @@ public class CEPOperatorTest extends TestLogger {
 
 		harness.processWatermark(new Watermark(Long.MIN_VALUE));
 
-		harness.processElement(new StreamRecord<>(startEvent1, 1L));
-		harness.processElement(new StreamRecord<>(startEventK2, 1L));
 		harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
 		harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
 		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+		harness.processElement(new StreamRecord<>(startEvent1, 1L));
+		harness.processElement(new StreamRecord<>(startEventK2, 1L));
 
 		// there must be 2 keys 42, 43 registered for the watermark callback
 		// all the seen elements must be in the priority queues but no NFA yet.
@@ -404,13 +401,13 @@ public class CEPOperatorTest extends TestLogger {
 
 		harness.processWatermark(new Watermark(Long.MIN_VALUE));
 
+		harness.processElement(new StreamRecord<>(middle2Event1, 6));
+		harness.processElement(new StreamRecord<>(middle1Event3, 7));
 		harness.processElement(new StreamRecord<>(startEvent, 1));
 		harness.processElement(new StreamRecord<>(middle1Event1, 3));
-		harness.processElement(new StreamRecord<>(middle1Event1, 3)); // this and the following get reordered
 		harness.processElement(new StreamRecord<>(middle1Event2, 3));
+		harness.processElement(new StreamRecord<>(middle1Event1, 3));
 		harness.processElement(new StreamRecord<>(new Event(41, "d", 6.0), 5));
-		harness.processElement(new StreamRecord<>(middle2Event1, 6));
-		harness.processElement(new StreamRecord<>(middle1Event3, 7));
 
 		assertEquals(1L, harness.numEventTimeTimers());
 		assertEquals(7L, operator.getPQSize(41));
@@ -554,8 +551,6 @@ public class CEPOperatorTest extends TestLogger {
 	}
 
 	@Test
-	@Ignore
-	// TODO: 5/19/17 Re-instate when checkpoints are fixed
 	public void testCEPOperatorSerializationWRocksDB() throws Exception {
 		String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
 		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
@@ -626,13 +621,13 @@ public class CEPOperatorTest extends TestLogger {
 		harness.processElement(new StreamRecord<>(startEvent1, 1));
 		harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
 		harness.processWatermark(2L);
+		harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
 		harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
 		harness.processElement(new StreamRecord<>(startEvent2, 4));
-		harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
 		harness.processWatermark(5L);
-		harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
 		harness.processElement(new StreamRecord<>(nextOne, 6));
 		harness.processElement(new StreamRecord<>(endEvent, 8));
+		harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
 		harness.processWatermark(100L);
 
 		List<List<Event>> resultingPatterns = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
index 277de1d..75655c6 100644
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot and b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2bd082f8/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
index b5ca51e..68ca0ec 100644
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot and b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot differ