You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/05/17 12:38:53 UTC
[7/9] flink git commit: [FLINK-6604] [cep] Remove java serialization
from the library.
http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 88a5703..824df2d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -32,9 +32,9 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.Ignore;
import org.junit.Test;
import java.net.URL;
@@ -57,7 +57,6 @@ public class CEPMigration11to13Test {
}
@Test
- @Ignore
public void testKeyedCEPOperatorMigratation() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
@@ -136,11 +135,58 @@ public class CEPMigration11to13Test {
assertEquals(middleEvent, patternMap.get("middle").get(0));
assertEquals(endEvent, patternMap.get("end").get(0));
+ // and now go for a checkpoint with the new serializers
+
+ final Event startEvent1 = new Event(42, "start", 2.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
+ final Event endEvent1 = new Event(42, "end", 2.0);
+
+ harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
+
+ // simulate snapshot/restore with some elements in internal sorting queue
+ OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+ harness.close();
+
+ harness = new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ IntSerializer.INSTANCE,
+ new NFAFactory(),
+ true),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+ harness.processWatermark(new Watermark(50));
+
+ result = harness.getOutput();
+
+ // watermark and the result
+ assertEquals(2, result.size());
+
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+
+ assertEquals(startEvent1, patternMap1.get("start").get(0));
+ assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+ assertEquals(endEvent1, patternMap1.get("end").get(0));
+
harness.close();
}
@Test
- @Ignore
public void testNonKeyedCEPFunctionMigration() throws Exception {
final Event startEvent = new Event(42, "start", 1.0);
@@ -191,7 +237,7 @@ public class CEPMigration11to13Test {
harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
harness.processElement(new StreamRecord<>(endEvent, 5));
- harness.processWatermark(new Watermark(Long.MAX_VALUE));
+ harness.processWatermark(new Watermark(20));
ConcurrentLinkedQueue<Object> result = harness.getOutput();
@@ -210,6 +256,54 @@ public class CEPMigration11to13Test {
assertEquals(middleEvent, patternMap.get("middle").get(0));
assertEquals(endEvent, patternMap.get("end").get(0));
+ // and now go for a checkpoint with the new serializers
+
+ final Event startEvent1 = new Event(42, "start", 2.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
+ final Event endEvent1 = new Event(42, "end", 2.0);
+
+ harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
+
+ // simulate snapshot/restore with some elements in internal sorting queue
+ OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+ harness.close();
+
+ harness = new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ ByteSerializer.INSTANCE,
+ new NFAFactory(),
+ false),
+ keySelector,
+ BasicTypeInfo.BYTE_TYPE_INFO);
+
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+ harness.processWatermark(new Watermark(50));
+
+ result = harness.getOutput();
+
+ // watermark and the result
+ assertEquals(2, result.size());
+
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+
+ assertEquals(startEvent1, patternMap1.get("start").get(0));
+ assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+ assertEquals(endEvent1, patternMap1.get("end").get(0));
+
harness.close();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index eb50dfd..41593b0 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -379,7 +380,6 @@ public class CEPOperatorTest extends TestLogger {
Event middle1Event3 = new Event(41, "a", 4.0);
Event middle2Event1 = new Event(41, "b", 5.0);
- TestKeySelector keySelector = new TestKeySelector();
KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
@@ -530,7 +530,113 @@ public class CEPOperatorTest extends TestLogger {
harness.close();
}
-
+
+ @Test
+ public void testCEPOperatorSerializationWRocksDB() throws Exception {
+ String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
+ RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
+ rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+
+ final Event startEvent1 = new Event(40, "start", 1.0);
+ final Event startEvent2 = new Event(40, "start", 2.0);
+ final SubEvent middleEvent1 = new SubEvent(40, "foo1", 1.0, 10);
+ final SubEvent middleEvent2 = new SubEvent(40, "foo2", 2.0, 10);
+ final SubEvent middleEvent3 = new SubEvent(40, "foo3", 3.0, 10);
+ final SubEvent middleEvent4 = new SubEvent(40, "foo4", 1.0, 10);
+ final Event nextOne = new Event(40, "next-one", 1.0);
+ final Event endEvent = new Event(40, "end", 1.0);
+
+ final Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }).followedBy("middle").subtype(SubEvent.class).where(new IterativeCondition<SubEvent>() {
+
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter (SubEvent value, Context < SubEvent > ctx) throws Exception {
+ if (!value.getName().startsWith("foo")) {
+ return false;
+ }
+
+ double sum = 0.0;
+ for (Event event : ctx.getEventsForPattern("middle")) {
+ sum += event.getPrice();
+ }
+ sum += value.getPrice();
+ return Double.compare(sum, 5.0) < 0;
+ }
+ }).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ });
+
+ KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ IntSerializer.INSTANCE,
+ new NFACompiler.NFAFactory<Event>() {
+
+ private static final long serialVersionUID = 477082663248051994L;
+
+ @Override
+ public NFA<Event> createNFA() {
+ return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ }
+ },
+ true);
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
+ harness.setStateBackend(rocksDBStateBackend);
+ harness.open();
+
+ harness.processWatermark(0L);
+ harness.processElement(new StreamRecord<>(startEvent1, 1));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+ harness.processWatermark(2L);
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+ harness.processElement(new StreamRecord<>(startEvent2, 4));
+ harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
+ harness.processWatermark(5L);
+ harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
+ harness.processElement(new StreamRecord<>(nextOne, 6));
+ harness.processElement(new StreamRecord<>(endEvent, 8));
+ harness.processWatermark(100L);
+
+ List<List<Event>> resultingPatterns = new ArrayList<>();
+ while (!harness.getOutput().isEmpty()) {
+ Object o = harness.getOutput().poll();
+ if (!(o instanceof Watermark)) {
+ StreamRecord<Map<String, List<Event>>> el = (StreamRecord<Map<String, List<Event>>>) o;
+ List<Event> res = new ArrayList<>();
+ for (List<Event> le: el.getValue().values()) {
+ res.addAll(le);
+ }
+ resultingPatterns.add(res);
+ }
+ }
+
+ compareMaps(resultingPatterns,
+ Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
+ Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+ Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+ )
+ );
+ }
+
private void verifyWatermark(Object outputObject, long timestamp) {
assertTrue(outputObject instanceof Watermark);
assertEquals(timestamp, ((Watermark) outputObject).getTimestamp());