You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 12:10:08 UTC
[3/3] flink git commit: [FLINK-6702] put the CEP tests'
harness.close() calls into a finally block
[FLINK-6702] put the CEP tests' harness.close() calls into a finally block
This closes #3978.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38c45f80
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38c45f80
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38c45f80
Branch: refs/heads/master
Commit: 38c45f8052166b93bfeebe7aed88a16c53a9332a
Parents: d5ab636
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed May 24 15:57:24 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 26 11:34:24 2017 +0200
----------------------------------------------------------------------
.../cep/operator/CEPFrom12MigrationTest.java | 408 +++++++------
.../cep/operator/CEPMigration11to13Test.java | 230 +++----
.../flink/cep/operator/CEPOperatorTest.java | 608 ++++++++++---------
.../flink/cep/operator/CEPRescalingTest.java | 287 +++++----
4 files changed, 812 insertions(+), 721 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/38c45f80/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index fb05901..0345192 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -84,22 +84,26 @@ public class CEPFrom12MigrationTest {
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- harness.setup();
- harness.open();
-
- harness.processElement(new StreamRecord<Event>(startEvent, 1));
- harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
- harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
- harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
- harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
-
- harness.processWatermark(new Watermark(5));
-
- // do snapshot and save to file
- OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
- OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-after-branching-flink1.2-snapshot");
-
- harness.close();
+ try {
+ harness.setup();
+ harness.open();
+
+ harness.processElement(new StreamRecord<Event>(startEvent, 1));
+ harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+ harness
+ .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+
+ harness.processWatermark(new Watermark(5));
+
+ // do snapshot and save to file
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ OperatorSnapshotUtil.writeStateHandle(snapshot,
+ "src/test/resources/cep-migration-after-branching-flink1.2-snapshot");
+ } finally {
+ harness.close();
+ }
}
@Test
@@ -130,95 +134,101 @@ public class CEPFrom12MigrationTest {
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- harness.setup();
- harness.initializeState(
+ try {
+ harness.setup();
+ harness.initializeState(
OperatorSnapshotUtil.readStateHandle(
- OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink1.2-snapshot")));
- harness.open();
+ OperatorSnapshotUtil
+ .getResourceFilename("cep-migration-after-branching-flink1.2-snapshot")));
+ harness.open();
- harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
- harness.processElement(new StreamRecord<>(endEvent, 5));
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+ harness.processElement(new StreamRecord<>(endEvent, 5));
- harness.processWatermark(new Watermark(20));
+ harness.processWatermark(new Watermark(20));
- ConcurrentLinkedQueue<Object> result = harness.getOutput();
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
- // watermark and 2 results
- assertEquals(3, result.size());
+ // watermark and 2 results
+ assertEquals(3, result.size());
- Object resultObject1 = result.poll();
- assertTrue(resultObject1 instanceof StreamRecord);
- StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
- assertTrue(resultRecord1.getValue() instanceof Map);
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
- Object resultObject2 = result.poll();
- assertTrue(resultObject2 instanceof StreamRecord);
- StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
- assertTrue(resultRecord2.getValue() instanceof Map);
+ Object resultObject2 = result.poll();
+ assertTrue(resultObject2 instanceof StreamRecord);
+ StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+ assertTrue(resultRecord2.getValue() instanceof Map);
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap1 =
+ (Map<String, List<Event>>) resultRecord1.getValue();
- assertEquals(startEvent, patternMap1.get("start").get(0));
- assertEquals(middleEvent1, patternMap1.get("middle").get(0));
- assertEquals(endEvent, patternMap1.get("end").get(0));
+ assertEquals(startEvent, patternMap1.get("start").get(0));
+ assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+ assertEquals(endEvent, patternMap1.get("end").get(0));
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap2 = (Map<String, List<Event>>) resultRecord2.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap2 =
+ (Map<String, List<Event>>) resultRecord2.getValue();
- assertEquals(startEvent, patternMap2.get("start").get(0));
- assertEquals(middleEvent2, patternMap2.get("middle").get(0));
- assertEquals(endEvent, patternMap2.get("end").get(0));
+ assertEquals(startEvent, patternMap2.get("start").get(0));
+ assertEquals(middleEvent2, patternMap2.get("middle").get(0));
+ assertEquals(endEvent, patternMap2.get("end").get(0));
- // and now go for a checkpoint with the new serializers
+ // and now go for a checkpoint with the new serializers
- final Event startEvent1 = new Event(42, "start", 2.0);
- final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
- final Event endEvent1 = new Event(42, "end", 2.0);
+ final Event startEvent1 = new Event(42, "start", 2.0);
+ final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
+ final Event endEvent1 = new Event(42, "end", 2.0);
- harness.processElement(new StreamRecord<Event>(startEvent1, 21));
- harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
+ harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+ harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
- // simulate snapshot/restore with some elements in internal sorting queue
- OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
- harness.close();
+ // simulate snapshot/restore with some elements in internal sorting queue
+ OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+ harness.close();
- harness = new KeyedOneInputStreamOperatorTestHarness<>(
+ harness = new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedCEPPatternOperator<>(
- Event.createTypeSerializer(),
- false,
- IntSerializer.INSTANCE,
- new NFAFactory(),
- true),
+ Event.createTypeSerializer(),
+ false,
+ IntSerializer.INSTANCE,
+ new NFAFactory(),
+ true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- harness.setup();
- harness.initializeState(snapshot);
- harness.open();
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
- harness.processElement(new StreamRecord<>(endEvent1, 25));
+ harness.processElement(new StreamRecord<>(endEvent1, 25));
- harness.processWatermark(new Watermark(50));
+ harness.processWatermark(new Watermark(50));
- result = harness.getOutput();
+ result = harness.getOutput();
- // watermark and the result
- assertEquals(2, result.size());
+ // watermark and the result
+ assertEquals(2, result.size());
- Object resultObject3 = result.poll();
- assertTrue(resultObject3 instanceof StreamRecord);
- StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
- assertTrue(resultRecord3.getValue() instanceof Map);
+ Object resultObject3 = result.poll();
+ assertTrue(resultObject3 instanceof StreamRecord);
+ StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+ assertTrue(resultRecord3.getValue() instanceof Map);
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap3 = (Map<String, List<Event>>) resultRecord3.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap3 =
+ (Map<String, List<Event>>) resultRecord3.getValue();
- assertEquals(startEvent1, patternMap3.get("start").get(0));
- assertEquals(middleEvent3, patternMap3.get("middle").get(0));
- assertEquals(endEvent1, patternMap3.get("end").get(0));
-
- harness.close();
+ assertEquals(startEvent1, patternMap3.get("start").get(0));
+ assertEquals(middleEvent3, patternMap3.get("middle").get(0));
+ assertEquals(endEvent1, patternMap3.get("end").get(0));
+ } finally {
+ harness.close();
+ }
}
/**
@@ -251,19 +261,23 @@ public class CEPFrom12MigrationTest {
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- harness.setup();
- harness.open();
- harness.processElement(new StreamRecord<Event>(startEvent1, 1));
- harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
- harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
- harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
- harness.processWatermark(new Watermark(5));
-
- // do snapshot and save to file
- OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
- OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot");
-
- harness.close();
+ try {
+ harness.setup();
+ harness.open();
+ harness.processElement(new StreamRecord<Event>(startEvent1, 1));
+ harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+ harness
+ .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+ harness.processWatermark(new Watermark(5));
+
+ // do snapshot and save to file
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ OperatorSnapshotUtil.writeStateHandle(snapshot,
+ "src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot");
+ } finally {
+ harness.close();
+ }
}
@Test
@@ -295,108 +309,115 @@ public class CEPFrom12MigrationTest {
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- harness.setup();
- harness.initializeState(
+ try {
+ harness.setup();
+ harness.initializeState(
OperatorSnapshotUtil.readStateHandle(
- OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink1.2-snapshot")));
- harness.open();
+ OperatorSnapshotUtil.getResourceFilename(
+ "cep-migration-starting-new-pattern-flink1.2-snapshot")));
+ harness.open();
- harness.processElement(new StreamRecord<>(startEvent2, 5));
- harness.processElement(new StreamRecord<Event>(middleEvent2, 6));
- harness.processElement(new StreamRecord<>(endEvent, 7));
+ harness.processElement(new StreamRecord<>(startEvent2, 5));
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 6));
+ harness.processElement(new StreamRecord<>(endEvent, 7));
- harness.processWatermark(new Watermark(20));
+ harness.processWatermark(new Watermark(20));
- ConcurrentLinkedQueue<Object> result = harness.getOutput();
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
- // watermark and 3 results
- assertEquals(4, result.size());
+ // watermark and 3 results
+ assertEquals(4, result.size());
- Object resultObject1 = result.poll();
- assertTrue(resultObject1 instanceof StreamRecord);
- StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
- assertTrue(resultRecord1.getValue() instanceof Map);
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
- Object resultObject2 = result.poll();
- assertTrue(resultObject2 instanceof StreamRecord);
- StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
- assertTrue(resultRecord2.getValue() instanceof Map);
+ Object resultObject2 = result.poll();
+ assertTrue(resultObject2 instanceof StreamRecord);
+ StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+ assertTrue(resultRecord2.getValue() instanceof Map);
- Object resultObject3 = result.poll();
- assertTrue(resultObject3 instanceof StreamRecord);
- StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
- assertTrue(resultRecord3.getValue() instanceof Map);
+ Object resultObject3 = result.poll();
+ assertTrue(resultObject3 instanceof StreamRecord);
+ StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+ assertTrue(resultRecord3.getValue() instanceof Map);
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap1 =
+ (Map<String, List<Event>>) resultRecord1.getValue();
- assertEquals(startEvent1, patternMap1.get("start").get(0));
- assertEquals(middleEvent1, patternMap1.get("middle").get(0));
- assertEquals(endEvent, patternMap1.get("end").get(0));
+ assertEquals(startEvent1, patternMap1.get("start").get(0));
+ assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+ assertEquals(endEvent, patternMap1.get("end").get(0));
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap2 = (Map<String, List<Event>>) resultRecord2.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap2 =
+ (Map<String, List<Event>>) resultRecord2.getValue();
- assertEquals(startEvent1, patternMap2.get("start").get(0));
- assertEquals(middleEvent2, patternMap2.get("middle").get(0));
- assertEquals(endEvent, patternMap2.get("end").get(0));
+ assertEquals(startEvent1, patternMap2.get("start").get(0));
+ assertEquals(middleEvent2, patternMap2.get("middle").get(0));
+ assertEquals(endEvent, patternMap2.get("end").get(0));
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap3 = (Map<String, List<Event>>) resultRecord3.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap3 =
+ (Map<String, List<Event>>) resultRecord3.getValue();
- assertEquals(startEvent2, patternMap3.get("start").get(0));
- assertEquals(middleEvent2, patternMap3.get("middle").get(0));
- assertEquals(endEvent, patternMap3.get("end").get(0));
+ assertEquals(startEvent2, patternMap3.get("start").get(0));
+ assertEquals(middleEvent2, patternMap3.get("middle").get(0));
+ assertEquals(endEvent, patternMap3.get("end").get(0));
- // and now go for a checkpoint with the new serializers
+ // and now go for a checkpoint with the new serializers
- final Event startEvent3 = new Event(42, "start", 2.0);
- final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
- final Event endEvent1 = new Event(42, "end", 2.0);
+ final Event startEvent3 = new Event(42, "start", 2.0);
+ final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
+ final Event endEvent1 = new Event(42, "end", 2.0);
- harness.processElement(new StreamRecord<Event>(startEvent3, 21));
- harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
+ harness.processElement(new StreamRecord<Event>(startEvent3, 21));
+ harness.processElement(new StreamRecord<Event>(middleEvent3, 23));
- // simulate snapshot/restore with some elements in internal sorting queue
- OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
- harness.close();
+ // simulate snapshot/restore with some elements in internal sorting queue
+ OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+ harness.close();
- harness = new KeyedOneInputStreamOperatorTestHarness<>(
+ harness = new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedCEPPatternOperator<>(
- Event.createTypeSerializer(),
- false,
- IntSerializer.INSTANCE,
- new NFAFactory(),
- true),
+ Event.createTypeSerializer(),
+ false,
+ IntSerializer.INSTANCE,
+ new NFAFactory(),
+ true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- harness.setup();
- harness.initializeState(snapshot);
- harness.open();
-
- harness.processElement(new StreamRecord<>(endEvent1, 25));
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
- harness.processWatermark(new Watermark(50));
+ harness.processElement(new StreamRecord<>(endEvent1, 25));
- result = harness.getOutput();
+ harness.processWatermark(new Watermark(50));
- // watermark and the result
- assertEquals(2, result.size());
+ result = harness.getOutput();
- Object resultObject4 = result.poll();
- assertTrue(resultObject4 instanceof StreamRecord);
- StreamRecord<?> resultRecord4 = (StreamRecord<?>) resultObject4;
- assertTrue(resultRecord4.getValue() instanceof Map);
+ // watermark and the result
+ assertEquals(2, result.size());
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap4 = (Map<String, List<Event>>) resultRecord4.getValue();
+ Object resultObject4 = result.poll();
+ assertTrue(resultObject4 instanceof StreamRecord);
+ StreamRecord<?> resultRecord4 = (StreamRecord<?>) resultObject4;
+ assertTrue(resultRecord4.getValue() instanceof Map);
- assertEquals(startEvent3, patternMap4.get("start").get(0));
- assertEquals(middleEvent3, patternMap4.get("middle").get(0));
- assertEquals(endEvent1, patternMap4.get("end").get(0));
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap4 =
+ (Map<String, List<Event>>) resultRecord4.getValue();
- harness.close();
+ assertEquals(startEvent3, patternMap4.get("start").get(0));
+ assertEquals(middleEvent3, patternMap4.get("middle").get(0));
+ assertEquals(endEvent1, patternMap4.get("end").get(0));
+ } finally {
+ harness.close();
+ }
}
/**
@@ -428,15 +449,18 @@ public class CEPFrom12MigrationTest {
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- harness.setup();
- harness.open();
- harness.processWatermark(new Watermark(5));
-
- // do snapshot and save to file
- OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
- OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot");
-
- harness.close();
+ try {
+ harness.setup();
+ harness.open();
+ harness.processWatermark(new Watermark(5));
+
+ // do snapshot and save to file
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ OperatorSnapshotUtil.writeStateHandle(snapshot,
+ "src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot");
+ } finally {
+ harness.close();
+ }
}
@@ -465,32 +489,36 @@ public class CEPFrom12MigrationTest {
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- harness.setup();
- harness.initializeState(
+ try {
+ harness.setup();
+ harness.initializeState(
OperatorSnapshotUtil.readStateHandle(
- OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink1.2-snapshot")));
- harness.open();
+ OperatorSnapshotUtil.getResourceFilename(
+ "cep-migration-single-pattern-afterwards-flink1.2-snapshot")));
+ harness.open();
- harness.processElement(new StreamRecord<>(startEvent1, 5));
+ harness.processElement(new StreamRecord<>(startEvent1, 5));
- harness.processWatermark(new Watermark(20));
+ harness.processWatermark(new Watermark(20));
- ConcurrentLinkedQueue<Object> result = harness.getOutput();
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
- // watermark and the result
- assertEquals(2, result.size());
+ // watermark and the result
+ assertEquals(2, result.size());
- Object resultObject = result.poll();
- assertTrue(resultObject instanceof StreamRecord);
- StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
- assertTrue(resultRecord.getValue() instanceof Map);
+ Object resultObject = result.poll();
+ assertTrue(resultObject instanceof StreamRecord);
+ StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+ assertTrue(resultRecord.getValue() instanceof Map);
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap =
+ (Map<String, List<Event>>) resultRecord.getValue();
- assertEquals(startEvent1, patternMap.get("start").get(0));
-
- harness.close();
+ assertEquals(startEvent1, patternMap.get("start").get(0));
+ } finally {
+ harness.close();
+ }
}
private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> {
http://git-wip-us.apache.org/repos/asf/flink/blob/38c45f80/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index d575e43..c92f772 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -111,81 +111,86 @@ public class CEPMigration11to13Test {
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- harness.setup();
- harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
- harness.open();
+ try {
+ harness.setup();
+ harness
+ .initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
+ harness.open();
- harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
- harness.processElement(new StreamRecord<>(endEvent, 5));
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+ harness.processElement(new StreamRecord<>(endEvent, 5));
- harness.processWatermark(new Watermark(20));
+ harness.processWatermark(new Watermark(20));
- ConcurrentLinkedQueue<Object> result = harness.getOutput();
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
- // watermark and the result
- assertEquals(2, result.size());
+ // watermark and the result
+ assertEquals(2, result.size());
- Object resultObject = result.poll();
- assertTrue(resultObject instanceof StreamRecord);
- StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
- assertTrue(resultRecord.getValue() instanceof Map);
+ Object resultObject = result.poll();
+ assertTrue(resultObject instanceof StreamRecord);
+ StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+ assertTrue(resultRecord.getValue() instanceof Map);
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap =
+ (Map<String, List<Event>>) resultRecord.getValue();
- assertEquals(startEvent, patternMap.get("start").get(0));
- assertEquals(middleEvent, patternMap.get("middle").get(0));
- assertEquals(endEvent, patternMap.get("end").get(0));
+ assertEquals(startEvent, patternMap.get("start").get(0));
+ assertEquals(middleEvent, patternMap.get("middle").get(0));
+ assertEquals(endEvent, patternMap.get("end").get(0));
- // and now go for a checkpoint with the new serializers
+ // and now go for a checkpoint with the new serializers
- final Event startEvent1 = new Event(42, "start", 2.0);
- final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
- final Event endEvent1 = new Event(42, "end", 2.0);
+ final Event startEvent1 = new Event(42, "start", 2.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
+ final Event endEvent1 = new Event(42, "end", 2.0);
- harness.processElement(new StreamRecord<Event>(startEvent1, 21));
- harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
+ harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
- // simulate snapshot/restore with some elements in internal sorting queue
- OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
- harness.close();
+ // simulate snapshot/restore with some elements in internal sorting queue
+ OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+ harness.close();
- harness = new KeyedOneInputStreamOperatorTestHarness<>(
- new KeyedCEPPatternOperator<>(
- Event.createTypeSerializer(),
- false,
- IntSerializer.INSTANCE,
- new NFAFactory(),
- true),
- keySelector,
- BasicTypeInfo.INT_TYPE_INFO);
-
- harness.setup();
- harness.initializeState(snapshot);
- harness.open();
+ harness = new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ IntSerializer.INSTANCE,
+ new NFAFactory(),
+ true),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
- harness.processElement(new StreamRecord<>(endEvent1, 25));
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
- harness.processWatermark(new Watermark(50));
+ harness.processElement(new StreamRecord<>(endEvent1, 25));
- result = harness.getOutput();
+ harness.processWatermark(new Watermark(50));
- // watermark and the result
- assertEquals(2, result.size());
+ result = harness.getOutput();
- Object resultObject1 = result.poll();
- assertTrue(resultObject1 instanceof StreamRecord);
- StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
- assertTrue(resultRecord1.getValue() instanceof Map);
+ // watermark and the result
+ assertEquals(2, result.size());
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
- assertEquals(startEvent1, patternMap1.get("start").get(0));
- assertEquals(middleEvent1, patternMap1.get("middle").get(0));
- assertEquals(endEvent1, patternMap1.get("end").get(0));
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap1 =
+ (Map<String, List<Event>>) resultRecord1.getValue();
- harness.close();
+ assertEquals(startEvent1, patternMap1.get("start").get(0));
+ assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+ assertEquals(endEvent1, patternMap1.get("end").get(0));
+ } finally {
+ harness.close();
+ }
}
@Test
@@ -233,81 +238,86 @@ public class CEPMigration11to13Test {
keySelector,
BasicTypeInfo.BYTE_TYPE_INFO);
- harness.setup();
- harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot"));
- harness.open();
+ try {
+ harness.setup();
+ harness.initializeStateFromLegacyCheckpoint(
+ getResourceFilename("cep-non-keyed-1.1-snapshot"));
+ harness.open();
- harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
- harness.processElement(new StreamRecord<>(endEvent, 5));
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+ harness.processElement(new StreamRecord<>(endEvent, 5));
- harness.processWatermark(new Watermark(20));
+ harness.processWatermark(new Watermark(20));
- ConcurrentLinkedQueue<Object> result = harness.getOutput();
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
- // watermark and the result
- assertEquals(2, result.size());
+ // watermark and the result
+ assertEquals(2, result.size());
- Object resultObject = result.poll();
- assertTrue(resultObject instanceof StreamRecord);
- StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
- assertTrue(resultRecord.getValue() instanceof Map);
+ Object resultObject = result.poll();
+ assertTrue(resultObject instanceof StreamRecord);
+ StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+ assertTrue(resultRecord.getValue() instanceof Map);
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap =
+ (Map<String, List<Event>>) resultRecord.getValue();
- assertEquals(startEvent, patternMap.get("start").get(0));
- assertEquals(middleEvent, patternMap.get("middle").get(0));
- assertEquals(endEvent, patternMap.get("end").get(0));
+ assertEquals(startEvent, patternMap.get("start").get(0));
+ assertEquals(middleEvent, patternMap.get("middle").get(0));
+ assertEquals(endEvent, patternMap.get("end").get(0));
- // and now go for a checkpoint with the new serializers
+ // and now go for a checkpoint with the new serializers
- final Event startEvent1 = new Event(42, "start", 2.0);
- final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
- final Event endEvent1 = new Event(42, "end", 2.0);
+ final Event startEvent1 = new Event(42, "start", 2.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0);
+ final Event endEvent1 = new Event(42, "end", 2.0);
- harness.processElement(new StreamRecord<Event>(startEvent1, 21));
- harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
+ harness.processElement(new StreamRecord<Event>(startEvent1, 21));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 23));
- // simulate snapshot/restore with some elements in internal sorting queue
- OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
- harness.close();
+ // simulate snapshot/restore with some elements in internal sorting queue
+ OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+ harness.close();
- harness = new KeyedOneInputStreamOperatorTestHarness<>(
- new KeyedCEPPatternOperator<>(
- Event.createTypeSerializer(),
- false,
- ByteSerializer.INSTANCE,
- new NFAFactory(),
- false),
- keySelector,
- BasicTypeInfo.BYTE_TYPE_INFO);
-
- harness.setup();
- harness.initializeState(snapshot);
- harness.open();
+ harness = new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ ByteSerializer.INSTANCE,
+ new NFAFactory(),
+ false),
+ keySelector,
+ BasicTypeInfo.BYTE_TYPE_INFO);
- harness.processElement(new StreamRecord<>(endEvent1, 25));
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
- harness.processWatermark(new Watermark(50));
+ harness.processElement(new StreamRecord<>(endEvent1, 25));
- result = harness.getOutput();
+ harness.processWatermark(new Watermark(50));
- // watermark and the result
- assertEquals(2, result.size());
+ result = harness.getOutput();
- Object resultObject1 = result.poll();
- assertTrue(resultObject1 instanceof StreamRecord);
- StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
- assertTrue(resultRecord1.getValue() instanceof Map);
+ // watermark and the result
+ assertEquals(2, result.size());
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap1 = (Map<String, List<Event>>) resultRecord1.getValue();
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
- assertEquals(startEvent1, patternMap1.get("start").get(0));
- assertEquals(middleEvent1, patternMap1.get("middle").get(0));
- assertEquals(endEvent1, patternMap1.get("end").get(0));
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap1 =
+ (Map<String, List<Event>>) resultRecord1.getValue();
- harness.close();
+ assertEquals(startEvent1, patternMap1.get("start").get(0));
+ assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+ assertEquals(endEvent1, patternMap1.get("end").get(0));
+ } finally {
+ harness.close();
+ }
}
private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
http://git-wip-us.apache.org/repos/asf/flink/blob/38c45f80/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index bf1436d..38ad0f1 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -67,15 +67,17 @@ public class CEPOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
- harness.open();
-
- Watermark expectedWatermark = new Watermark(42L);
+ try {
+ harness.open();
- harness.processWatermark(expectedWatermark);
+ Watermark expectedWatermark = new Watermark(42L);
- verifyWatermark(harness.getOutput().poll(), 42L);
+ harness.processWatermark(expectedWatermark);
- harness.close();
+ verifyWatermark(harness.getOutput().poll(), 42L);
+ } finally {
+ harness.close();
+ }
}
@Test
@@ -83,59 +85,62 @@ public class CEPOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
- harness.open();
-
- Event startEvent = new Event(42, "start", 1.0);
- SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
- Event endEvent= new Event(42, "end", 1.0);
+ try {
+ harness.open();
- harness.processElement(new StreamRecord<>(startEvent, 1L));
- harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+ Event startEvent = new Event(42, "start", 1.0);
+ SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+ Event endEvent = new Event(42, "end", 1.0);
- // simulate snapshot/restore with some elements in internal sorting queue
- OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
- harness.close();
+ harness.processElement(new StreamRecord<>(startEvent, 1L));
+ harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
- harness = getCepTestHarness(false);
+ // simulate snapshot/restore with some elements in internal sorting queue
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ harness.close();
- harness.setup();
- harness.initializeState(snapshot);
- harness.open();
+ harness = getCepTestHarness(false);
- harness.processWatermark(new Watermark(Long.MIN_VALUE));
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
- harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+ harness.processWatermark(new Watermark(Long.MIN_VALUE));
- // if element timestamps are not correctly checkpointed/restored this will lead to
- // a pruning time underflow exception in NFA
- harness.processWatermark(new Watermark(2L));
+ harness
+ .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
- harness.processElement(new StreamRecord<Event>(middleEvent, 3L));
- harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L));
- harness.processElement(new StreamRecord<>(endEvent, 5L));
+ // if element timestamps are not correctly checkpointed/restored this will lead to
+ // a pruning time underflow exception in NFA
+ harness.processWatermark(new Watermark(2L));
- // simulate snapshot/restore with empty element queue but NFA state
- OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
- harness.close();
+ harness.processElement(new StreamRecord<Event>(middleEvent, 3L));
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L));
+ harness.processElement(new StreamRecord<>(endEvent, 5L));
- harness = getCepTestHarness(false);
+ // simulate snapshot/restore with empty element queue but NFA state
+ OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
+ harness.close();
- harness.setup();
- harness.initializeState(snapshot2);
- harness.open();
+ harness = getCepTestHarness(false);
- harness.processWatermark(new Watermark(Long.MAX_VALUE));
+ harness.setup();
+ harness.initializeState(snapshot2);
+ harness.open();
- // get and verify the output
+ harness.processWatermark(new Watermark(Long.MAX_VALUE));
- Queue<Object> result = harness.getOutput();
+ // get and verify the output
- assertEquals(2, result.size());
+ Queue<Object> result = harness.getOutput();
- verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
- verifyWatermark(result.poll(), Long.MAX_VALUE);
+ assertEquals(2, result.size());
- harness.close();
+ verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
+ verifyWatermark(result.poll(), Long.MAX_VALUE);
+ } finally {
+ harness.close();
+ }
}
@Test
@@ -147,68 +152,71 @@ public class CEPOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
- harness.setStateBackend(rocksDBStateBackend);
-
- harness.open();
+ try {
+ harness.setStateBackend(rocksDBStateBackend);
- Event startEvent = new Event(42, "start", 1.0);
- SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
- Event endEvent= new Event(42, "end", 1.0);
+ harness.open();
- harness.processElement(new StreamRecord<>(startEvent, 1L));
- harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+ Event startEvent = new Event(42, "start", 1.0);
+ SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+ Event endEvent = new Event(42, "end", 1.0);
- // simulate snapshot/restore with some elements in internal sorting queue
- OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
- harness.close();
+ harness.processElement(new StreamRecord<>(startEvent, 1L));
+ harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
- harness = getCepTestHarness(false);
+ // simulate snapshot/restore with some elements in internal sorting queue
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ harness.close();
- rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
- rocksDBStateBackend.setDbStoragePath(rocksDbPath);
- harness.setStateBackend(rocksDBStateBackend);
+ harness = getCepTestHarness(false);
- harness.setup();
- harness.initializeState(snapshot);
- harness.open();
+ rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
+ rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+ harness.setStateBackend(rocksDBStateBackend);
- harness.processWatermark(new Watermark(Long.MIN_VALUE));
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
- harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+ harness.processWatermark(new Watermark(Long.MIN_VALUE));
- // if element timestamps are not correctly checkpointed/restored this will lead to
- // a pruning time underflow exception in NFA
- harness.processWatermark(new Watermark(2L));
+ harness
+ .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
- // simulate snapshot/restore with empty element queue but NFA state
- OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
- harness.close();
+ // if element timestamps are not correctly checkpointed/restored this will lead to
+ // a pruning time underflow exception in NFA
+ harness.processWatermark(new Watermark(2L));
- harness = getCepTestHarness(false);
+ // simulate snapshot/restore with empty element queue but NFA state
+ OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
+ harness.close();
- rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
- rocksDBStateBackend.setDbStoragePath(rocksDbPath);
- harness.setStateBackend(rocksDBStateBackend);
- harness.setup();
- harness.initializeState(snapshot2);
- harness.open();
+ harness = getCepTestHarness(false);
- harness.processElement(new StreamRecord<Event>(middleEvent, 3L));
- harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L));
- harness.processElement(new StreamRecord<>(endEvent, 5L));
+ rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
+ rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+ harness.setStateBackend(rocksDBStateBackend);
+ harness.setup();
+ harness.initializeState(snapshot2);
+ harness.open();
- harness.processWatermark(new Watermark(Long.MAX_VALUE));
+ harness.processElement(new StreamRecord<Event>(middleEvent, 3L));
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L));
+ harness.processElement(new StreamRecord<>(endEvent, 5L));
- // get and verify the output
+ harness.processWatermark(new Watermark(Long.MAX_VALUE));
- Queue<Object> result = harness.getOutput();
+ // get and verify the output
- assertEquals(2, result.size());
+ Queue<Object> result = harness.getOutput();
- verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
- verifyWatermark(result.poll(), Long.MAX_VALUE);
+ assertEquals(2, result.size());
- harness.close();
+ verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
+ verifyWatermark(result.poll(), Long.MAX_VALUE);
+ } finally {
+ harness.close();
+ }
}
/**
@@ -299,85 +307,88 @@ public class CEPOperatorTest extends TestLogger {
KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(false);
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
- harness.open();
-
- harness.processWatermark(new Watermark(Long.MIN_VALUE));
-
- harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
- harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
- harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
- harness.processElement(new StreamRecord<>(startEvent1, 1L));
- harness.processElement(new StreamRecord<>(startEventK2, 1L));
-
- // there must be 2 keys 42, 43 registered for the watermark callback
- // all the seen elements must be in the priority queues but no NFA yet.
-
- assertEquals(2L, harness.numEventTimeTimers());
- assertEquals(4L, operator.getPQSize(42));
- assertEquals(1L, operator.getPQSize(43));
- assertTrue(!operator.hasNonEmptyNFA(42));
- assertTrue(!operator.hasNonEmptyNFA(43));
-
- harness.processWatermark(new Watermark(2L));
-
- verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
- verifyWatermark(harness.getOutput().poll(), 2L);
-
- // still the 2 keys
- // one element in PQ for 42 (the barfoo) as it arrived early
- // for 43 the element entered the NFA and the PQ is empty
-
- assertEquals(2L, harness.numEventTimeTimers());
- assertTrue(operator.hasNonEmptyNFA(42));
- assertEquals(1L, operator.getPQSize(42));
- assertTrue(operator.hasNonEmptyNFA(43));
- assertTrue(!operator.hasNonEmptyPQ(43));
-
- harness.processElement(new StreamRecord<>(startEvent2, 4L));
- harness.processElement(new StreamRecord<Event>(middleEvent2, 5L));
-
- OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
- harness.close();
-
- KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOpearator(false);
- harness = getCepTestHarness(operator2);
- harness.setup();
- harness.initializeState(snapshot);
- harness.open();
-
- harness.processElement(new StreamRecord<>(endEvent1, 6L));
- harness.processWatermark(11L);
- harness.processWatermark(12L);
-
- // now we have 1 key because the 43 expired and was removed.
- // 42 is still there due to startEvent2
- assertEquals(1L, harness.numEventTimeTimers());
- assertTrue(operator2.hasNonEmptyNFA(42));
- assertTrue(!operator2.hasNonEmptyPQ(42));
- assertTrue(!operator2.hasNonEmptyNFA(43));
- assertTrue(!operator2.hasNonEmptyPQ(43));
-
- verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
- verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
- verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
- verifyWatermark(harness.getOutput().poll(), 11L);
- verifyWatermark(harness.getOutput().poll(), 12L);
-
- harness.processElement(new StreamRecord<Event>(middleEvent3, 12L));
- harness.processElement(new StreamRecord<>(endEvent2, 13L));
- harness.processWatermark(20L);
- harness.processWatermark(21L);
-
- assertTrue(!operator2.hasNonEmptyNFA(42));
- assertTrue(!operator2.hasNonEmptyPQ(42));
- assertEquals(0L, harness.numEventTimeTimers());
-
- verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
- verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
- verifyWatermark(harness.getOutput().poll(), 20L);
- verifyWatermark(harness.getOutput().poll(), 21L);
-
- harness.close();
+ try {
+ harness.open();
+
+ harness.processWatermark(new Watermark(Long.MIN_VALUE));
+
+ harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
+ harness
+ .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+ harness.processElement(new StreamRecord<>(startEvent1, 1L));
+ harness.processElement(new StreamRecord<>(startEventK2, 1L));
+
+ // there must be 2 keys 42, 43 registered for the watermark callback
+ // all the seen elements must be in the priority queues but no NFA yet.
+
+ assertEquals(2L, harness.numEventTimeTimers());
+ assertEquals(4L, operator.getPQSize(42));
+ assertEquals(1L, operator.getPQSize(43));
+ assertTrue(!operator.hasNonEmptyNFA(42));
+ assertTrue(!operator.hasNonEmptyNFA(43));
+
+ harness.processWatermark(new Watermark(2L));
+
+ verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
+ verifyWatermark(harness.getOutput().poll(), 2L);
+
+ // still the 2 keys
+ // one element in PQ for 42 (the barfoo) as it arrived early
+ // for 43 the element entered the NFA and the PQ is empty
+
+ assertEquals(2L, harness.numEventTimeTimers());
+ assertTrue(operator.hasNonEmptyNFA(42));
+ assertEquals(1L, operator.getPQSize(42));
+ assertTrue(operator.hasNonEmptyNFA(43));
+ assertTrue(!operator.hasNonEmptyPQ(43));
+
+ harness.processElement(new StreamRecord<>(startEvent2, 4L));
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 5L));
+
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ harness.close();
+
+ KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOpearator(false);
+ harness = getCepTestHarness(operator2);
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(endEvent1, 6L));
+ harness.processWatermark(11L);
+ harness.processWatermark(12L);
+
+ // now we have 1 key because the 43 expired and was removed.
+ // 42 is still there due to startEvent2
+ assertEquals(1L, harness.numEventTimeTimers());
+ assertTrue(operator2.hasNonEmptyNFA(42));
+ assertTrue(!operator2.hasNonEmptyPQ(42));
+ assertTrue(!operator2.hasNonEmptyNFA(43));
+ assertTrue(!operator2.hasNonEmptyPQ(43));
+
+ verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+ verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
+ verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
+ verifyWatermark(harness.getOutput().poll(), 11L);
+ verifyWatermark(harness.getOutput().poll(), 12L);
+
+ harness.processElement(new StreamRecord<Event>(middleEvent3, 12L));
+ harness.processElement(new StreamRecord<>(endEvent2, 13L));
+ harness.processWatermark(20L);
+ harness.processWatermark(21L);
+
+ assertTrue(!operator2.hasNonEmptyNFA(42));
+ assertTrue(!operator2.hasNonEmptyPQ(42));
+ assertEquals(0L, harness.numEventTimeTimers());
+
+ verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+ verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
+ verifyWatermark(harness.getOutput().poll(), 20L);
+ verifyWatermark(harness.getOutput().poll(), 21L);
+ } finally {
+ harness.close();
+ }
}
@Test
@@ -397,49 +408,51 @@ public class CEPOperatorTest extends TestLogger {
true);
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
- harness.open();
+ try {
+ harness.open();
- harness.processWatermark(new Watermark(Long.MIN_VALUE));
+ harness.processWatermark(new Watermark(Long.MIN_VALUE));
- harness.processElement(new StreamRecord<>(middle2Event1, 6));
- harness.processElement(new StreamRecord<>(middle1Event3, 7));
- harness.processElement(new StreamRecord<>(startEvent, 1));
- harness.processElement(new StreamRecord<>(middle1Event1, 3));
- harness.processElement(new StreamRecord<>(middle1Event2, 3));
- harness.processElement(new StreamRecord<>(middle1Event1, 3));
- harness.processElement(new StreamRecord<>(new Event(41, "d", 6.0), 5));
+ harness.processElement(new StreamRecord<>(middle2Event1, 6));
+ harness.processElement(new StreamRecord<>(middle1Event3, 7));
+ harness.processElement(new StreamRecord<>(startEvent, 1));
+ harness.processElement(new StreamRecord<>(middle1Event1, 3));
+ harness.processElement(new StreamRecord<>(middle1Event2, 3));
+ harness.processElement(new StreamRecord<>(middle1Event1, 3));
+ harness.processElement(new StreamRecord<>(new Event(41, "d", 6.0), 5));
- assertEquals(1L, harness.numEventTimeTimers());
- assertEquals(7L, operator.getPQSize(41));
- assertTrue(!operator.hasNonEmptyNFA(41));
+ assertEquals(1L, harness.numEventTimeTimers());
+ assertEquals(7L, operator.getPQSize(41));
+ assertTrue(!operator.hasNonEmptyNFA(41));
- harness.processWatermark(new Watermark(2L));
+ harness.processWatermark(new Watermark(2L));
- verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
- verifyWatermark(harness.getOutput().poll(), 2L);
+ verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
+ verifyWatermark(harness.getOutput().poll(), 2L);
- assertEquals(1L, harness.numEventTimeTimers());
- assertEquals(6L, operator.getPQSize(41));
- assertTrue(operator.hasNonEmptyNFA(41)); // processed the first element
+ assertEquals(1L, harness.numEventTimeTimers());
+ assertEquals(6L, operator.getPQSize(41));
+ assertTrue(operator.hasNonEmptyNFA(41)); // processed the first element
- harness.processWatermark(new Watermark(8L));
+ harness.processWatermark(new Watermark(8L));
- List<List<Event>> resultingPatterns = new ArrayList<>();
- while (!harness.getOutput().isEmpty()) {
- Object o = harness.getOutput().poll();
- if (!(o instanceof Watermark)) {
- StreamRecord<Map<String, List<Event>>> el = (StreamRecord<Map<String, List<Event>>>) o;
- List<Event> res = new ArrayList<>();
- for (List<Event> le: el.getValue().values()) {
- res.addAll(le);
+ List<List<Event>> resultingPatterns = new ArrayList<>();
+ while (!harness.getOutput().isEmpty()) {
+ Object o = harness.getOutput().poll();
+ if (!(o instanceof Watermark)) {
+ StreamRecord<Map<String, List<Event>>> el =
+ (StreamRecord<Map<String, List<Event>>>) o;
+ List<Event> res = new ArrayList<>();
+ for (List<Event> le : el.getValue().values()) {
+ res.addAll(le);
+ }
+ resultingPatterns.add(res);
+ } else {
+ verifyWatermark(o, 8L);
}
- resultingPatterns.add(res);
- } else {
- verifyWatermark(o, 8L);
}
- }
- compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middle1Event1),
Lists.newArrayList(startEvent, middle1Event1, middle1Event2),
@@ -448,24 +461,28 @@ public class CEPOperatorTest extends TestLogger {
Lists.newArrayList(startEvent, middle1Event1, middle1Event2, middle1Event1),
Lists.newArrayList(startEvent, middle1Event1, middle2Event1, middle1Event3),
- Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3),
- Lists.newArrayList(startEvent, middle1Event1, middle1Event2, middle2Event1, middle1Event3),
+ Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2,
+ middle1Event3),
+ Lists.newArrayList(startEvent, middle1Event1, middle1Event2, middle2Event1,
+ middle1Event3),
- Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3)
- ));
+ Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2,
+ middle2Event1, middle1Event3)
+ ));
- assertEquals(1L, harness.numEventTimeTimers());
- assertEquals(0L, operator.getPQSize(41));
- assertTrue(operator.hasNonEmptyNFA(41));
+ assertEquals(1L, harness.numEventTimeTimers());
+ assertEquals(0L, operator.getPQSize(41));
+ assertTrue(operator.hasNonEmptyNFA(41));
- harness.processWatermark(new Watermark(17L));
- verifyWatermark(harness.getOutput().poll(), 17L);
+ harness.processWatermark(new Watermark(17L));
+ verifyWatermark(harness.getOutput().poll(), 17L);
- assertTrue(!operator.hasNonEmptyNFA(41));
- assertTrue(!operator.hasNonEmptyPQ(41));
- assertEquals(0L, harness.numEventTimeTimers());
-
- harness.close();
+ assertTrue(!operator.hasNonEmptyNFA(41));
+ assertTrue(!operator.hasNonEmptyPQ(41));
+ assertEquals(0L, harness.numEventTimeTimers());
+ } finally {
+ harness.close();
+ }
}
@Test
@@ -484,70 +501,73 @@ public class CEPOperatorTest extends TestLogger {
KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(true);
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
- harness.open();
-
- harness.setProcessingTime(0L);
+ try {
+ harness.open();
- harness.processElement(new StreamRecord<>(startEvent1, 1L));
- harness.processElement(new StreamRecord<>(startEventK2, 1L));
- harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
- harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
- harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
+ harness.setProcessingTime(0L);
- assertTrue(!operator.hasNonEmptyPQ(42));
- assertTrue(!operator.hasNonEmptyPQ(43));
- assertTrue(operator.hasNonEmptyNFA(42));
- assertTrue(operator.hasNonEmptyNFA(43));
+ harness.processElement(new StreamRecord<>(startEvent1, 1L));
+ harness.processElement(new StreamRecord<>(startEventK2, 1L));
+ harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 2L));
+ harness
+ .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
- harness.setProcessingTime(3L);
+ assertTrue(!operator.hasNonEmptyPQ(42));
+ assertTrue(!operator.hasNonEmptyPQ(43));
+ assertTrue(operator.hasNonEmptyNFA(42));
+ assertTrue(operator.hasNonEmptyNFA(43));
- harness.processElement(new StreamRecord<>(startEvent2, 3L));
- harness.processElement(new StreamRecord<Event>(middleEvent2, 4L));
+ harness.setProcessingTime(3L);
- OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
- harness.close();
+ harness.processElement(new StreamRecord<>(startEvent2, 3L));
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 4L));
- KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOpearator(true);
- harness = getCepTestHarness(operator2);
- harness.setup();
- harness.initializeState(snapshot);
- harness.open();
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ harness.close();
- harness.setProcessingTime(3L);
- harness.processElement(new StreamRecord<>(endEvent1, 5L));
+ KeyedCEPPatternOperator<Event, Integer> operator2 = getKeyedCepOpearator(true);
+ harness = getCepTestHarness(operator2);
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
- verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
- verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
- verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
+ harness.setProcessingTime(3L);
+ harness.processElement(new StreamRecord<>(endEvent1, 5L));
- harness.setProcessingTime(11L);
+ verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+ verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1);
+ verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1);
- harness.processElement(new StreamRecord<Event>(middleEvent3, 11L));
- harness.processElement(new StreamRecord<>(endEvent2, 12L));
+ harness.setProcessingTime(11L);
- verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
- verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
+ harness.processElement(new StreamRecord<Event>(middleEvent3, 11L));
+ harness.processElement(new StreamRecord<>(endEvent2, 12L));
- harness.setProcessingTime(21L);
+ verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+ verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
- assertTrue(operator2.hasNonEmptyNFA(42));
+ harness.setProcessingTime(21L);
- harness.processElement(new StreamRecord<>(startEvent1, 21L));
- assertTrue(operator2.hasNonEmptyNFA(42));
+ assertTrue(operator2.hasNonEmptyNFA(42));
- harness.setProcessingTime(49L);
+ harness.processElement(new StreamRecord<>(startEvent1, 21L));
+ assertTrue(operator2.hasNonEmptyNFA(42));
- // TODO: 3/13/17 we have to have another event in order to clean up
- harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
+ harness.setProcessingTime(49L);
- // the pattern expired
- assertTrue(!operator2.hasNonEmptyNFA(42));
+ // TODO: 3/13/17 we have to have another event in order to clean up
+ harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
- assertEquals(0L, harness.numEventTimeTimers());
- assertTrue(!operator2.hasNonEmptyPQ(42));
- assertTrue(!operator2.hasNonEmptyPQ(43));
+ // the pattern expired
+ assertTrue(!operator2.hasNonEmptyNFA(42));
- harness.close();
+ assertEquals(0L, harness.numEventTimeTimers());
+ assertTrue(!operator2.hasNonEmptyPQ(42));
+ assertTrue(!operator2.hasNonEmptyPQ(43));
+ } finally {
+ harness.close();
+ }
}
@Test
@@ -614,47 +634,53 @@ public class CEPOperatorTest extends TestLogger {
true);
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator);
- harness.setStateBackend(rocksDBStateBackend);
- harness.open();
-
- harness.processWatermark(0L);
- harness.processElement(new StreamRecord<>(startEvent1, 1));
- harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
- harness.processWatermark(2L);
- harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
- harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
- harness.processElement(new StreamRecord<>(startEvent2, 4));
- harness.processWatermark(5L);
- harness.processElement(new StreamRecord<>(nextOne, 6));
- harness.processElement(new StreamRecord<>(endEvent, 8));
- harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
- harness.processWatermark(100L);
-
- List<List<Event>> resultingPatterns = new ArrayList<>();
- while (!harness.getOutput().isEmpty()) {
- Object o = harness.getOutput().poll();
- if (!(o instanceof Watermark)) {
- StreamRecord<Map<String, List<Event>>> el = (StreamRecord<Map<String, List<Event>>>) o;
- List<Event> res = new ArrayList<>();
- for (List<Event> le: el.getValue().values()) {
- res.addAll(le);
+
+ try {
+ harness.setStateBackend(rocksDBStateBackend);
+ harness.open();
+
+ harness.processWatermark(0L);
+ harness.processElement(new StreamRecord<>(startEvent1, 1));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+ harness.processWatermark(2L);
+ harness.processElement(new StreamRecord<Event>(middleEvent3, 5));
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+ harness.processElement(new StreamRecord<>(startEvent2, 4));
+ harness.processWatermark(5L);
+ harness.processElement(new StreamRecord<>(nextOne, 6));
+ harness.processElement(new StreamRecord<>(endEvent, 8));
+ harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
+ harness.processWatermark(100L);
+
+ List<List<Event>> resultingPatterns = new ArrayList<>();
+ while (!harness.getOutput().isEmpty()) {
+ Object o = harness.getOutput().poll();
+ if (!(o instanceof Watermark)) {
+ StreamRecord<Map<String, List<Event>>> el =
+ (StreamRecord<Map<String, List<Event>>>) o;
+ List<Event> res = new ArrayList<>();
+ for (List<Event> le : el.getValue().values()) {
+ res.addAll(le);
+ }
+ resultingPatterns.add(res);
}
- resultingPatterns.add(res);
}
- }
- compareMaps(resultingPatterns,
+ compareMaps(resultingPatterns,
Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
- Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
- Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
- Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
- Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
- Lists.newArrayList(startEvent1, endEvent, middleEvent1),
- Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+ Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2,
+ middleEvent4),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
+ Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+ Lists.newArrayList(startEvent2, endEvent, middleEvent3)
)
- );
- harness.close();
+ );
+ } finally {
+ harness.close();
+ }
}
private void verifyWatermark(Object outputObject, long timestamp) {
http://git-wip-us.apache.org/repos/asf/flink/blob/38c45f80/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 45d7215..86be09c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -79,70 +79,86 @@ public class CEPRescalingTest {
assertEquals(1, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 2, keygroup));
// now we start the test, we go from parallelism 1 to 2.
+ OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = null;
+ OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness1 = null;
+ OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness2 = null;
- OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
- getTestHarness(maxParallelism, 1, 0);
- harness.open();
+ try {
+ harness = getTestHarness(maxParallelism, 1, 0);
+ harness.open();
- harness.processElement(new StreamRecord<>(startEvent1, 1)); // valid element
- harness.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2));
+ harness.processElement(
+ new StreamRecord<>(startEvent1, 1)); // valid element
+ harness.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2));
- harness.processElement(new StreamRecord<>(startEvent2, 3)); // valid element
- harness.processElement(new StreamRecord<Event>(middleEvent2, 4)); // valid element
+ harness.processElement(
+ new StreamRecord<>(startEvent2, 3)); // valid element
+ harness.processElement(
+ new StreamRecord<Event>(middleEvent2, 4)); // valid element
- // take a snapshot with some elements in internal sorting queue
- OperatorStateHandles snapshot = harness.snapshot(0, 0);
- harness.close();
+ // take a snapshot with some elements in internal sorting queue
+ OperatorStateHandles snapshot = harness.snapshot(0, 0);
+ harness.close();
- // initialize two sub-tasks with the previously snapshotted state to simulate scaling up
+ // initialize two sub-tasks with the previously snapshotted state to simulate scaling up
- // we know that the valid element will go to index 0,
- // so we initialize the two tasks and we put the rest of
- // the valid elements for the pattern on task 0.
+ // we know that the valid element will go to index 0,
+ // so we initialize the two tasks and we put the rest of
+ // the valid elements for the pattern on task 0.
- OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness1 =
- getTestHarness(maxParallelism, 2, 0);
+ harness1 = getTestHarness(maxParallelism, 2, 0);
- harness1.setup();
- harness1.initializeState(snapshot);
- harness1.open();
+ harness1.setup();
+ harness1.initializeState(snapshot);
+ harness1.open();
- // if element timestamps are not correctly checkpointed/restored this will lead to
- // a pruning time underflow exception in NFA
- harness1.processWatermark(new Watermark(2));
+ // if element timestamps are not correctly checkpointed/restored this will lead to
+ // a pruning time underflow exception in NFA
+ harness1.processWatermark(new Watermark(2));
- harness1.processElement(new StreamRecord<Event>(middleEvent1, 3)); // valid element
- harness1.processElement(new StreamRecord<>(endEvent1, 5)); // valid element
+ harness1.processElement(
+ new StreamRecord<Event>(middleEvent1, 3)); // valid element
+ harness1.processElement(
+ new StreamRecord<>(endEvent1, 5)); // valid element
- harness1.processWatermark(new Watermark(Long.MAX_VALUE));
+ harness1.processWatermark(new Watermark(Long.MAX_VALUE));
- // watermarks and the result
- assertEquals(3, harness1.getOutput().size());
- verifyWatermark(harness1.getOutput().poll(), 2);
- verifyPattern(harness1.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+ // watermarks and the result
+ assertEquals(3, harness1.getOutput().size());
+ verifyWatermark(harness1.getOutput().poll(), 2);
+ verifyPattern(harness1.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
- OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness2 =
- getTestHarness(maxParallelism, 2, 1);
+ harness2 = getTestHarness(maxParallelism, 2, 1);
- harness2.setup();
- harness2.initializeState(snapshot);
- harness2.open();
+ harness2.setup();
+ harness2.initializeState(snapshot);
+ harness2.open();
- // now we move to the second parallel task
- harness2.processWatermark(new Watermark(2));
+ // now we move to the second parallel task
+ harness2.processWatermark(new Watermark(2));
- harness2.processElement(new StreamRecord<>(endEvent2, 5));
- harness2.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+ harness2.processElement(new StreamRecord<>(endEvent2, 5));
+ harness2.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
- harness2.processWatermark(new Watermark(Long.MAX_VALUE));
+ harness2.processWatermark(new Watermark(Long.MAX_VALUE));
- assertEquals(3, harness2.getOutput().size());
- verifyWatermark(harness2.getOutput().poll(), 2);
- verifyPattern(harness2.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+ assertEquals(3, harness2.getOutput().size());
+ verifyWatermark(harness2.getOutput().poll(), 2);
+ verifyPattern(harness2.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+ } finally {
+ closeSilently(harness);
+ closeSilently(harness1);
+ closeSilently(harness2);
+ }
+ }
- harness.close();
- harness1.close();
- harness2.close();
+ private static void closeSilently(OneInputStreamOperatorTestHarness<?, ?> harness) {
+ if (harness != null) {
+ try {
+ harness.close();
+ } catch (Throwable ignored) {
+ }
+ }
}
@Test
@@ -211,109 +227,120 @@ public class CEPRescalingTest {
getTestHarness(maxParallelism, 3, 2);
harness3.open();
- harness1.processWatermark(Long.MIN_VALUE);
- harness2.processWatermark(Long.MIN_VALUE);
- harness3.processWatermark(Long.MIN_VALUE);
-
- harness1.processElement(new StreamRecord<>(startEvent1, 1)); // valid element
- harness1.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2));
- harness1.processElement(new StreamRecord<Event>(middleEvent1, 3)); // valid element
- harness1.processElement(new StreamRecord<>(endEvent1, 5)); // valid element
-
- // till here we have a valid sequence, so after creating the
- // new instance and sending it a watermark, we expect it to fire,
- // even with no new elements.
-
- harness1.processElement(new StreamRecord<>(startEvent3, 10));
- harness1.processElement(new StreamRecord<>(startEvent1, 10));
+ OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness4 = null;
+ OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness5 = null;
- harness2.processElement(new StreamRecord<>(startEvent2, 7));
- harness2.processElement(new StreamRecord<Event>(middleEvent2, 8));
+ try {
+ harness1.processWatermark(Long.MIN_VALUE);
+ harness2.processWatermark(Long.MIN_VALUE);
+ harness3.processWatermark(Long.MIN_VALUE);
- harness3.processElement(new StreamRecord<>(startEvent4, 15));
- harness3.processElement(new StreamRecord<Event>(middleEvent4, 16));
- harness3.processElement(new StreamRecord<>(endEvent4, 17));
+ harness1.processElement(
+ new StreamRecord<>(startEvent1, 1)); // valid element
+ harness1.processElement(new StreamRecord<>(new Event(7, "foobar", 1.0), 2));
+ harness1.processElement(
+ new StreamRecord<Event>(middleEvent1, 3)); // valid element
+ harness1.processElement(
+ new StreamRecord<>(endEvent1, 5)); // valid element
- // so far we only have the initial watermark
- assertEquals(1, harness1.getOutput().size());
- verifyWatermark(harness1.getOutput().poll(), Long.MIN_VALUE);
+ // till here we have a valid sequence, so after creating the
+ // new instance and sending it a watermark, we expect it to fire,
+ // even with no new elements.
- assertEquals(1, harness2.getOutput().size());
- verifyWatermark(harness2.getOutput().poll(), Long.MIN_VALUE);
+ harness1.processElement(new StreamRecord<>(startEvent3, 10));
+ harness1.processElement(new StreamRecord<>(startEvent1, 10));
- assertEquals(1, harness3.getOutput().size());
- verifyWatermark(harness3.getOutput().poll(), Long.MIN_VALUE);
+ harness2.processElement(new StreamRecord<>(startEvent2, 7));
+ harness2.processElement(new StreamRecord<Event>(middleEvent2, 8));
- // we take a snapshot and make it look as a single operator
- // this will be the initial state of all downstream tasks.
- OperatorStateHandles snapshot = AbstractStreamOperatorTestHarness.repackageState(
- harness2.snapshot(0, 0),
- harness1.snapshot(0, 0),
- harness3.snapshot(0, 0)
- );
+ harness3.processElement(new StreamRecord<>(startEvent4, 15));
+ harness3.processElement(new StreamRecord<Event>(middleEvent4, 16));
+ harness3.processElement(new StreamRecord<>(endEvent4, 17));
- OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness4 =
- getTestHarness(maxParallelism, 2, 0);
- harness4.setup();
- harness4.initializeState(snapshot);
- harness4.open();
+ // so far we only have the initial watermark
+ assertEquals(1, harness1.getOutput().size());
+ verifyWatermark(harness1.getOutput().poll(), Long.MIN_VALUE);
- OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness5 =
- getTestHarness(maxParallelism, 2, 1);
- harness5.setup();
- harness5.initializeState(snapshot);
- harness5.open();
+ assertEquals(1, harness2.getOutput().size());
+ verifyWatermark(harness2.getOutput().poll(), Long.MIN_VALUE);
- harness5.processElement(new StreamRecord<>(endEvent2, 11));
- harness5.processWatermark(new Watermark(12));
+ assertEquals(1, harness3.getOutput().size());
+ verifyWatermark(harness3.getOutput().poll(), Long.MIN_VALUE);
- verifyPattern(harness5.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
- verifyWatermark(harness5.getOutput().poll(), 12);
+ // we take a snapshot and make it look as a single operator
+ // this will be the initial state of all downstream tasks.
+ OperatorStateHandles snapshot = AbstractStreamOperatorTestHarness.repackageState(
+ harness2.snapshot(0, 0),
+ harness1.snapshot(0, 0),
+ harness3.snapshot(0, 0)
+ );
- // if element timestamps are not correctly checkpointed/restored this will lead to
- // a pruning time underflow exception in NFA
- harness4.processWatermark(new Watermark(12));
+ harness4 = getTestHarness(maxParallelism, 2, 0);
+ harness4.setup();
+ harness4.initializeState(snapshot);
+ harness4.open();
- assertEquals(2, harness4.getOutput().size());
- verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
- verifyWatermark(harness4.getOutput().poll(), 12);
+ harness5 = getTestHarness(maxParallelism, 2, 1);
+ harness5.setup();
+ harness5.initializeState(snapshot);
+ harness5.open();
- harness4.processElement(new StreamRecord<Event>(middleEvent3, 15)); // valid element
- harness4.processElement(new StreamRecord<>(endEvent3, 16)); // valid element
+ harness5.processElement(new StreamRecord<>(endEvent2, 11));
+ harness5.processWatermark(new Watermark(12));
- harness4.processElement(new StreamRecord<Event>(middleEvent1, 15)); // valid element
- harness4.processElement(new StreamRecord<>(endEvent1, 16)); // valid element
+ verifyPattern(harness5.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
+ verifyWatermark(harness5.getOutput().poll(), 12);
- harness4.processWatermark(new Watermark(Long.MAX_VALUE));
- harness5.processWatermark(new Watermark(Long.MAX_VALUE));
+ // if element timestamps are not correctly checkpointed/restored this will lead to
+ // a pruning time underflow exception in NFA
+ harness4.processWatermark(new Watermark(12));
- // verify result
- assertEquals(3, harness4.getOutput().size());
-
- // check the order of the events in the output
- Queue<Object> output = harness4.getOutput();
- StreamRecord<?> resultRecord = (StreamRecord<?>) output.peek();
- assertTrue(resultRecord.getValue() instanceof Map);
-
- @SuppressWarnings("unchecked")
- Map<String, List<Event>> patternMap = (Map<String, List<Event>>) resultRecord.getValue();
- if (patternMap.get("start").get(0).getId() == 7) {
+ assertEquals(2, harness4.getOutput().size());
verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
- verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
- } else {
- verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
- verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
- }
-
- // after scaling down this should end up here
- assertEquals(2, harness5.getOutput().size());
- verifyPattern(harness5.getOutput().poll(), startEvent4, middleEvent4, endEvent4);
+ verifyWatermark(harness4.getOutput().poll(), 12);
+
+ harness4.processElement(
+ new StreamRecord<Event>(middleEvent3, 15)); // valid element
+ harness4.processElement(
+ new StreamRecord<>(endEvent3, 16)); // valid element
+
+ harness4.processElement(
+ new StreamRecord<Event>(middleEvent1, 15)); // valid element
+ harness4.processElement(
+ new StreamRecord<>(endEvent1, 16)); // valid element
+
+ harness4.processWatermark(new Watermark(Long.MAX_VALUE));
+ harness5.processWatermark(new Watermark(Long.MAX_VALUE));
+
+ // verify result
+ assertEquals(3, harness4.getOutput().size());
+
+ // check the order of the events in the output
+ Queue<Object> output = harness4.getOutput();
+ StreamRecord<?> resultRecord = (StreamRecord<?>) output.peek();
+ assertTrue(resultRecord.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, List<Event>> patternMap =
+ (Map<String, List<Event>>) resultRecord.getValue();
+ if (patternMap.get("start").get(0).getId() == 7) {
+ verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+ verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
+ } else {
+ verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
+ verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
+ }
- harness1.close();
- harness2.close();
- harness3.close();
- harness4.close();
- harness5.close();
+ // after scaling down this should end up here
+ assertEquals(2, harness5.getOutput().size());
+ verifyPattern(harness5.getOutput().poll(), startEvent4, middleEvent4, endEvent4);
+ } finally {
+ closeSilently(harness1);
+ closeSilently(harness2);
+ closeSilently(harness3);
+ closeSilently(harness4);
+ closeSilently(harness5);
+ }
}
private void verifyWatermark(Object outputObject, long timestamp) {