You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 11:52:07 UTC
[11/11] flink git commit: [FLINK-5969] Add CEPFrom12MigrationTest
[FLINK-5969] Add CEPFrom12MigrationTest
The binary snapshots have been created on the Flink 1.2 branch.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/852a710b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/852a710b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/852a710b
Branch: refs/heads/release-1.2
Commit: 852a710b4d91bdd319238c87d227c51a904070a7
Parents: 53432e0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Apr 28 12:28:50 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 13:50:05 2017 +0200
----------------------------------------------------------------------
.../cep/operator/CEPFrom12MigrationTest.java | 480 +++++++++++++++++++
...-migration-after-branching-flink1.2-snapshot | Bin 0 -> 5580 bytes
...-single-pattern-afterwards-flink1.2-snapshot | Bin 0 -> 2326 bytes
...ation-starting-new-pattern-flink1.2-snapshot | Bin 0 -> 5389 bytes
4 files changed, 480 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
new file mode 100644
index 0000000..9a15754
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for checking whether CEP operator can restore from snapshots that were done
+ * using the Flink 1.2 operator.
+ *
+ * <p>For regenerating the binary snapshot file you have to run the {@code write*()} method on
+ * the Flink 1.2 branch.
+ */
+
+public class CEPFrom12MigrationTest {
+
+ /**
+ * Manually run this to write binary snapshot data.
+ */
+ @Ignore
+ @Test
+ public void writAfterBranchingPatternSnapshot() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent = new Event(42, "start", 1.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+ final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory()),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ harness.open();
+
+ harness.processElement(new StreamRecord<Event>(startEvent, 1));
+ harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+ harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+
+ harness.processWatermark(new Watermark(5));
+
+ // do snapshot and save to file
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-after-branching-flink1.2-snapshot");
+
+ harness.close();
+ }
+
+ @Test
+ public void testRestoreAfterBranchingPattern() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent = new Event(42, "start", 1.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+ final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+ final Event endEvent = new Event(42, "end", 1.0);
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory()),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ harness.initializeState(
+ OperatorSnapshotUtil.readStateHandle(
+ OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink1.2-snapshot")));
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+ harness.processElement(new StreamRecord<>(endEvent, 5));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and 2 results
+ assertEquals(3, result.size());
+
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
+
+ Object resultObject2 = result.poll();
+ assertTrue(resultObject2 instanceof StreamRecord);
+ StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+ assertTrue(resultRecord2.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+ assertEquals(startEvent, patternMap1.get("start"));
+ assertEquals(middleEvent1, patternMap1.get("middle"));
+ assertEquals(endEvent, patternMap1.get("end"));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+ assertEquals(startEvent, patternMap2.get("start"));
+ assertEquals(middleEvent2, patternMap2.get("middle"));
+ assertEquals(endEvent, patternMap2.get("end"));
+
+ harness.close();
+ }
+
+ /**
+ * Manually run this to write binary snapshot data.
+ */
+ @Ignore
+ @Test
+ public void writeStartingNewPatternAfterMigrationSnapshot() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent1 = new Event(42, "start", 1.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory()),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ harness.open();
+ harness.processElement(new StreamRecord<Event>(startEvent1, 1));
+ harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+ harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+ harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+ harness.processWatermark(new Watermark(5));
+
+ // do snapshot and save to file
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot");
+
+ harness.close();
+ }
+
+ @Test
+ public void testRestoreStartingNewPatternAfterMigration() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent1 = new Event(42, "start", 1.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+ final Event startEvent2 = new Event(42, "start", 5.0);
+ final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+ final Event endEvent = new Event(42, "end", 1.0);
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory()),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ harness.initializeState(
+ OperatorSnapshotUtil.readStateHandle(
+ OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink1.2-snapshot")));
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(startEvent2, 5));
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 6));
+ harness.processElement(new StreamRecord<>(endEvent, 7));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and 3 results
+ assertEquals(4, result.size());
+
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
+
+ Object resultObject2 = result.poll();
+ assertTrue(resultObject2 instanceof StreamRecord);
+ StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+ assertTrue(resultRecord2.getValue() instanceof Map);
+
+ Object resultObject3 = result.poll();
+ assertTrue(resultObject3 instanceof StreamRecord);
+ StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+ assertTrue(resultRecord3.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+ assertEquals(startEvent1, patternMap1.get("start"));
+ assertEquals(middleEvent1, patternMap1.get("middle"));
+ assertEquals(endEvent, patternMap1.get("end"));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+ assertEquals(startEvent1, patternMap2.get("start"));
+ assertEquals(middleEvent2, patternMap2.get("middle"));
+ assertEquals(endEvent, patternMap2.get("end"));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap3 = (Map<String, Event>) resultRecord3.getValue();
+
+ assertEquals(startEvent2, patternMap3.get("start"));
+ assertEquals(middleEvent2, patternMap3.get("middle"));
+ assertEquals(endEvent, patternMap3.get("end"));
+
+ harness.close();
+ }
+
+ /**
+ * Manually run this to write binary snapshot data.
+ */
+ @Ignore
+ @Test
+ public void writeSinglePatternAfterMigrationSnapshot() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent1 = new Event(42, "start", 1.0);
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new SinglePatternNFAFactory()),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ harness.open();
+ harness.processWatermark(new Watermark(5));
+
+ // do snapshot and save to file
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+ OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot");
+
+ harness.close();
+ }
+
+
+ @Test
+ public void testSinglePatternAfterMigration() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent1 = new Event(42, "start", 1.0);
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new SinglePatternNFAFactory()),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ harness.initializeState(
+ OperatorSnapshotUtil.readStateHandle(
+ OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink1.2-snapshot")));
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(startEvent1, 5));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and the result
+ assertEquals(2, result.size());
+
+ Object resultObject = result.poll();
+ assertTrue(resultObject instanceof StreamRecord);
+ StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+ assertTrue(resultRecord.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+ assertEquals(startEvent1, patternMap.get("start"));
+
+ harness.close();
+ }
+
+ private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> {
+
+ private static final long serialVersionUID = 1173020762472766713L;
+
+ private final boolean handleTimeout;
+
+ private SinglePatternNFAFactory() {
+ this(false);
+ }
+
+ private SinglePatternNFAFactory(boolean handleTimeout) {
+ this.handleTimeout = handleTimeout;
+ }
+
+ @Override
+ public NFA<Event> createNFA() {
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+ .within(Time.milliseconds(10L));
+
+ return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ }
+ }
+
+ private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
+
+ private static final long serialVersionUID = 1173020762472766713L;
+
+ private final boolean handleTimeout;
+
+ private NFAFactory() {
+ this(false);
+ }
+
+ private NFAFactory(boolean handleTimeout) {
+ this.handleTimeout = handleTimeout;
+ }
+
+ @Override
+ public NFA<Event> createNFA() {
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+ .followedBy("middle")
+ .subtype(SubEvent.class)
+ .where(new MiddleFilter())
+ .followedBy("end")
+ .where(new EndFilter())
+ // add a window timeout to test whether timestamps of elements in the
+ // priority queue in CEP operator are correctly checkpointed/restored
+ .within(Time.milliseconds(10L));
+
+ return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ }
+ }
+
+ private static class StartFilter implements FilterFunction<Event> {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }
+
+ private static class MiddleFilter implements FilterFunction<SubEvent> {
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(SubEvent value) throws Exception {
+ return value.getVolume() > 5.0;
+ }
+ }
+
+ private static class EndFilter implements FilterFunction<Event> {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot
new file mode 100644
index 0000000..6775f2a
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot
new file mode 100644
index 0000000..f63b7dd
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot
new file mode 100644
index 0000000..8e0fd27
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot differ