You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/05/17 12:38:53 UTC

[7/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 88a5703..824df2d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -32,9 +32,9 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.net.URL;
@@ -57,7 +57,6 @@ public class CEPMigration11to13Test {
 	}
 
 	@Test
-	@Ignore
 	public void testKeyedCEPOperatorMigratation() throws Exception {
 
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
@@ -136,11 +135,58 @@ public class CEPMigration11to13Test {
 		assertEquals(middleEvent, patternMap.get("middle").get(0));
 		assertEquals(endEvent, patternMap.get("end").get(0));
 
+		// and now go for a checkpoint with the new serializers
+
+		final Event startEvent1 = new Event(42, "start", 2.0);
+		final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
+		final Event endEvent1 = new Event(42, "end", 2.0);
+
+		harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+		harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
+
+		// simulate snapshot/restore with some elements in internal sorting queue
+		OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+		harness.close();
+
+		harness = new KeyedOneInputStreamOperatorTestHarness<>(
+						new KeyedCEPPatternOperator<>(
+								Event.createTypeSerializer(),
+								false,
+								IntSerializer.INSTANCE,
+								new NFAFactory(),
+								true),
+						keySelector,
+						BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.setup();
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+		harness.processWatermark(new Watermark(50));
+
+		result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject1 = result.poll();
+		assertTrue(resultObject1 instanceof StreamRecord);
+		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+		assertTrue(resultRecord1.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+
+		assertEquals(startEvent1, patternMap1.get("start").get(0));
+		assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+		assertEquals(endEvent1, patternMap1.get("end").get(0));
+
 		harness.close();
 	}
 
 	@Test
-	@Ignore
 	public void testNonKeyedCEPFunctionMigration() throws Exception {
 
 		final Event startEvent = new Event(42, "start", 1.0);
@@ -191,7 +237,7 @@ public class CEPMigration11to13Test {
 		harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
 		harness.processElement(new StreamRecord<>(endEvent, 5));
 
-		harness.processWatermark(new Watermark(Long.MAX_VALUE));
+		harness.processWatermark(new Watermark(20));
 
 		ConcurrentLinkedQueue<Object> result = harness.getOutput();
 
@@ -210,6 +256,54 @@ public class CEPMigration11to13Test {
 		assertEquals(middleEvent, patternMap.get("middle").get(0));
 		assertEquals(endEvent, patternMap.get("end").get(0));
 
+		// and now go for a checkpoint with the new serializers
+
+		final Event startEvent1 = new Event(42, "start", 2.0);
+		final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
+		final Event endEvent1 = new Event(42, "end", 2.0);
+
+		harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+		harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
+
+		// simulate snapshot/restore with some elements in internal sorting queue
+		OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+		harness.close();
+
+		harness = new KeyedOneInputStreamOperatorTestHarness<>(
+						new KeyedCEPPatternOperator<>(
+								Event.createTypeSerializer(),
+								false,
+								ByteSerializer.INSTANCE,
+								new NFAFactory(),
+								false),
+						keySelector,
+						BasicTypeInfo.BYTE_TYPE_INFO);
+
+		harness.setup();
+		harness.initializeState(snapshot);
+		harness.open();
+
+		harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+		harness.processWatermark(new Watermark(50));
+
+		result = harness.getOutput();
+
+		// watermark and the result
+		assertEquals(2, result.size());
+
+		Object resultObject1 = result.poll();
+		assertTrue(resultObject1 instanceof StreamRecord);
+		StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+		assertTrue(resultRecord1.getValue() instanceof Map);
+
+		@SuppressWarnings("unchecked")
+		Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+
+		assertEquals(startEvent1, patternMap1.get("start").get(0));
+		assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+		assertEquals(endEvent1, patternMap1.get("end").get(0));
+
 		harness.close();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index eb50dfd..41593b0 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -379,7 +380,6 @@ public class CEPOperatorTest extends TestLogger {
 		Event middle1Event3 = new Event(41, "a", 4.0);
 		Event middle2Event1 = new Event(41, "b", 5.0);
 
-		TestKeySelector keySelector = new TestKeySelector();
 		KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>(
 				Event.createTypeSerializer(),
 				false,
@@ -530,7 +530,113 @@ public class CEPOperatorTest extends TestLogger {
 
 		harness.close();
 	}
-	
+
+	@Test
+	public void testCEPOperatorSerializationWRocksDB() throws Exception {
+		String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
+		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
+		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+
+		final Event startEvent1 = new Event(40, "start", 1.0);
+		final Event startEvent2 = new Event(40, "start", 2.0);
+		final SubEvent middleEvent1 = new SubEvent(40, "foo1", 1.0, 10);
+		final SubEvent middleEvent2 = new SubEvent(40, "foo2", 2.0, 10);
+		final SubEvent middleEvent3 = new SubEvent(40, "foo3", 3.0, 10);
+		final SubEvent middleEvent4 = new SubEvent(40, "foo4", 1.0, 10);
+		final Event nextOne = new Event(40, "next-one", 1.0);
+		final Event endEvent = new Event(40, "end", 1.0);
+
+		final Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle").subtype(SubEvent.class).where(new IterativeCondition<SubEvent>() {
+
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter (SubEvent value, Context < SubEvent > ctx) throws Exception {
+				if (!value.getName().startsWith("foo")) {
+					return false;
+				}
+
+				double sum = 0.0;
+				for (Event event : ctx.getEventsForPattern("middle")) {
+					sum += event.getPrice();
+				}
+				sum += value.getPrice();
+				return Double.compare(sum, 5.0) < 0;
+			}
+		}).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>(
+				Event.createTypeSerializer(),
+				false,
+				IntSerializer.INSTANCE,
+				new NFACompiler.NFAFactory<Event>() {
+
+					private static final long serialVersionUID = 477082663248051994L;
+
+					@Override
+					public NFA<Event> createNFA() {
+						return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+					}
+				},
+				true);
+
+		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
+		harness.setStateBackend(rocksDBStateBackend);
+		harness.open();
+
+		harness.processWatermark(0L);
+		harness.processElement(new StreamRecord<>(startEvent1, 1));
+		harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+		harness.processWatermark(2L);
+		harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+		harness.processElement(new StreamRecord<>(startEvent2, 4));
+		harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
+		harness.processWatermark(5L);
+		harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
+		harness.processElement(new StreamRecord<>(nextOne, 6));
+		harness.processElement(new StreamRecord<>(endEvent, 8));
+		harness.processWatermark(100L);
+
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+		while (!harness.getOutput().isEmpty()) {
+			Object o = harness.getOutput().poll();
+			if (!(o instanceof Watermark)) {
+				StreamRecord<Map<String, List<Event>>> el = (StreamRecord<Map<String, List<Event>>>) o;
+				List<Event> res = new ArrayList<>();
+				for (List<Event> le: el.getValue().values()) {
+					res.addAll(le);
+				}
+				resultingPatterns.add(res);
+			}
+		}
+
+		compareMaps(resultingPatterns,
+				Lists.<List<Event>>newArrayList(
+						Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
+						Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
+						Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+						Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+				)
+		);
+	}
+
 	private void verifyWatermark(Object outputObject, long timestamp) {
 		assertTrue(outputObject instanceof Watermark);
 		assertEquals(timestamp, ((Watermark) outputObject).getTimestamp());