You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/16 04:46:07 UTC
[flink] 02/02: [FLINK-17727][e2e] Re-enable
"[FLINK-17467][task][e2e] Modify existing upgrade test to verify aligned
savepoints" after a fix
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 91932ccc0592f969dc7d264f2adaecfa0c65d3da
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri May 15 14:51:17 2020 +0200
[FLINK-17727][e2e] Re-enable "[FLINK-17467][task][e2e] Modify existing upgrade test to verify aligned savepoints" after a fix
This reverts commit 3af17562eb791e3f486c38dbd94dc3328309b262.
---
.../tests/StatefulStreamJobUpgradeTestProgram.java | 113 +++++++++++++++++++--
1 file changed, 102 insertions(+), 11 deletions(-)
diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index 3cec4b8..aa94578 100644
--- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.tests;
import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -73,7 +74,17 @@ public class StatefulStreamJobUpgradeTestProgram {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
+ env.getCheckpointConfig().enableUnalignedCheckpoints();
+ if (isOriginalJobVariant(pt)) {
+ executeOriginalVariant(env, pt);
+ }
+ else {
+ executeUpgradedVariant(env, pt);
+ }
+ }
+
+ private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
@@ -83,11 +94,34 @@ public class StatefulStreamJobUpgradeTestProgram {
List<TypeSerializer<ComplexPayload>> stateSer =
Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
- KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
- applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
+ KeyedStream<Event, Integer> afterStatefulOperations =
+ applyOriginalStatefulOperations(source, stateSer, Collections.emptyList());
+
+ afterStatefulOperations
+ .flatMap(createSemanticsCheckMapper(pt))
+ .name("SemanticsCheckMapper")
+ .addSink(new PrintSinkFunction<>());
+
+ env.execute("General purpose test job");
+ }
+
+ private static void executeUpgradedVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
+ KeyedStream<UpgradedEvent, Integer> source = env.addSource(createEventSource(pt))
+ .name("EventSource")
+ .uid("EventSource")
+ .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+ .map(new UpgradeEvent())
+ .keyBy(UpgradedEvent::getKey);
+
+ List<TypeSerializer<ComplexPayload>> stateSer =
+ Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
+
+ KeyedStream<UpgradedEvent, Integer> afterStatefulOperations =
applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
afterStatefulOperations
+ .map(new DowngradeEvent())
+ .keyBy(Event::getKey)
.flatMap(createSemanticsCheckMapper(pt))
.name("SemanticsCheckMapper")
.addSink(new PrintSinkFunction<>());
@@ -115,15 +149,6 @@ public class StatefulStreamJobUpgradeTestProgram {
return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer, stateClass);
}
- private static KeyedStream<Event, Integer> applyUpgradedStatefulOperations(
- KeyedStream<Event, Integer> source,
- List<TypeSerializer<ComplexPayload>> stateSer,
- List<Class<ComplexPayload>> stateClass) {
- source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer, stateClass);
- source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer, stateClass);
- return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer, stateClass);
- }
-
private static KeyedStream<Event, Integer> applyTestStatefulOperator(
String name,
JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
@@ -138,6 +163,29 @@ public class StatefulStreamJobUpgradeTestProgram {
.keyBy(Event::getKey);
}
+ private static KeyedStream<UpgradedEvent, Integer> applyUpgradedStatefulOperations(
+ KeyedStream<UpgradedEvent, Integer> source,
+ List<TypeSerializer<ComplexPayload>> stateSer,
+ List<Class<ComplexPayload>> stateClass) {
+ source = applyUpgradedTestStatefulOperator("stateMap2", simpleUpgradedStateUpdate("stateMap2"), source, stateSer, stateClass);
+ source = applyUpgradedTestStatefulOperator("stateMap1", lastUpgradedStateUpdate("stateMap1"), source, stateSer, stateClass);
+ return applyUpgradedTestStatefulOperator("stateMap3", simpleUpgradedStateUpdate("stateMap3"), source, stateSer, stateClass);
+ }
+
+ private static KeyedStream<UpgradedEvent, Integer> applyUpgradedTestStatefulOperator(
+ String name,
+ JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> stateFunc,
+ KeyedStream<UpgradedEvent, Integer> source,
+ List<TypeSerializer<ComplexPayload>> stateSer,
+ List<Class<ComplexPayload>> stateClass) {
+ return source
+ .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
+ .name(name)
+ .uid(name)
+ .returns(UpgradedEvent.class)
+ .keyBy(UpgradedEvent::getKey);
+ }
+
private static JoinFunction<Event, ComplexPayload, ComplexPayload> simpleStateUpdate(String strPayload) {
return (Event first, ComplexPayload second) -> {
verifyState(strPayload, second);
@@ -145,6 +193,13 @@ public class StatefulStreamJobUpgradeTestProgram {
};
}
+ private static JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> simpleUpgradedStateUpdate(String strPayload) {
+ return (UpgradedEvent first, ComplexPayload second) -> {
+ verifyState(strPayload, second);
+ return new ComplexPayload(first.event, strPayload);
+ };
+ }
+
private static JoinFunction<Event, ComplexPayload, ComplexPayload> lastStateUpdate(String strPayload) {
return (Event first, ComplexPayload second) -> {
verifyState(strPayload, second);
@@ -153,9 +208,45 @@ public class StatefulStreamJobUpgradeTestProgram {
};
}
+ private static JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> lastUpgradedStateUpdate(String strPayload) {
+ return (UpgradedEvent first, ComplexPayload second) -> {
+ verifyState(strPayload, second);
+ boolean isLastEvent = second != null && first.event.getEventTime() <= second.getEventTime();
+ return isLastEvent ? second : new ComplexPayload(first.event, strPayload);
+ };
+ }
+
private static void verifyState(String strPayload, ComplexPayload state) {
if (state != null && !state.getStrPayload().equals(strPayload)) {
System.out.println("State is set or restored incorrectly");
}
}
+
+ private static class UpgradeEvent implements MapFunction<Event, UpgradedEvent> {
+ @Override
+ public UpgradedEvent map(Event event) {
+ return new UpgradedEvent(event);
+ }
+ }
+
+ private static class UpgradedEvent {
+ public final Event event;
+ public final String randomPayload;
+
+ public UpgradedEvent(Event event) {
+ this.event = event;
+ this.randomPayload = event.getPayload().toUpperCase();
+ }
+
+ public int getKey() {
+ return event.getKey();
+ }
+ }
+
+ private static class DowngradeEvent implements MapFunction<UpgradedEvent, Event> {
+ @Override
+ public Event map(UpgradedEvent value) throws Exception {
+ return value.event;
+ }
+ }
}