You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/16 04:46:07 UTC

[flink] 02/02: [FLINK-17727][e2e] Re-enable "[FLINK-17467][task][e2e] Modify existing upgrade test to verify aligned savepoints" after a fix

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91932ccc0592f969dc7d264f2adaecfa0c65d3da
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri May 15 14:51:17 2020 +0200

    [FLINK-17727][e2e] Re-enable "[FLINK-17467][task][e2e] Modify existing upgrade test to verify aligned savepoints" after a fix
    
    This reverts commit 3af17562eb791e3f486c38dbd94dc3328309b262.
---
 .../tests/StatefulStreamJobUpgradeTestProgram.java | 113 +++++++++++++++++++--
 1 file changed, 102 insertions(+), 11 deletions(-)

diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index 3cec4b8..aa94578 100644
--- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.tests;
 
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -73,7 +74,17 @@ public class StatefulStreamJobUpgradeTestProgram {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		setupEnvironment(env, pt);
+		env.getCheckpointConfig().enableUnalignedCheckpoints();
 
+		if (isOriginalJobVariant(pt)) {
+			executeOriginalVariant(env, pt);
+		}
+		else {
+			executeUpgradedVariant(env, pt);
+		}
+	}
+
+	private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
 		KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
 			.name("EventSource")
 			.uid("EventSource")
@@ -83,11 +94,34 @@ public class StatefulStreamJobUpgradeTestProgram {
 		List<TypeSerializer<ComplexPayload>> stateSer =
 			Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
 
-		KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
-			applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
+		KeyedStream<Event, Integer> afterStatefulOperations =
+			applyOriginalStatefulOperations(source, stateSer, Collections.emptyList());
+
+		afterStatefulOperations
+			.flatMap(createSemanticsCheckMapper(pt))
+			.name("SemanticsCheckMapper")
+			.addSink(new PrintSinkFunction<>());
+
+		env.execute("General purpose test job");
+	}
+
+	private static void executeUpgradedVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
+		KeyedStream<UpgradedEvent, Integer> source = env.addSource(createEventSource(pt))
+			.name("EventSource")
+			.uid("EventSource")
+			.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+			.map(new UpgradeEvent())
+			.keyBy(UpgradedEvent::getKey);
+
+		List<TypeSerializer<ComplexPayload>> stateSer =
+			Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
+
+		KeyedStream<UpgradedEvent, Integer> afterStatefulOperations =
 			applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
 
 		afterStatefulOperations
+			.map(new DowngradeEvent())
+			.keyBy(Event::getKey)
 			.flatMap(createSemanticsCheckMapper(pt))
 			.name("SemanticsCheckMapper")
 			.addSink(new PrintSinkFunction<>());
@@ -115,15 +149,6 @@ public class StatefulStreamJobUpgradeTestProgram {
 		return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer, stateClass);
 	}
 
-	private static KeyedStream<Event, Integer> applyUpgradedStatefulOperations(
-			KeyedStream<Event, Integer> source,
-			List<TypeSerializer<ComplexPayload>> stateSer,
-			List<Class<ComplexPayload>> stateClass) {
-		source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer, stateClass);
-		source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer, stateClass);
-		return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer, stateClass);
-	}
-
 	private static KeyedStream<Event, Integer> applyTestStatefulOperator(
 			String name,
 			JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
@@ -138,6 +163,29 @@ public class StatefulStreamJobUpgradeTestProgram {
 			.keyBy(Event::getKey);
 	}
 
+	private static KeyedStream<UpgradedEvent, Integer> applyUpgradedStatefulOperations(
+			KeyedStream<UpgradedEvent, Integer> source,
+			List<TypeSerializer<ComplexPayload>> stateSer,
+			List<Class<ComplexPayload>> stateClass) {
+		source = applyUpgradedTestStatefulOperator("stateMap2", simpleUpgradedStateUpdate("stateMap2"), source, stateSer, stateClass);
+		source = applyUpgradedTestStatefulOperator("stateMap1", lastUpgradedStateUpdate("stateMap1"), source, stateSer, stateClass);
+		return applyUpgradedTestStatefulOperator("stateMap3", simpleUpgradedStateUpdate("stateMap3"), source, stateSer, stateClass);
+	}
+
+	private static KeyedStream<UpgradedEvent, Integer> applyUpgradedTestStatefulOperator(
+			String name,
+			JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> stateFunc,
+			KeyedStream<UpgradedEvent, Integer> source,
+			List<TypeSerializer<ComplexPayload>> stateSer,
+			List<Class<ComplexPayload>> stateClass) {
+		return source
+			.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
+			.name(name)
+			.uid(name)
+			.returns(UpgradedEvent.class)
+			.keyBy(UpgradedEvent::getKey);
+	}
+
 	private static JoinFunction<Event, ComplexPayload, ComplexPayload> simpleStateUpdate(String strPayload) {
 		return (Event first, ComplexPayload second) -> {
 			verifyState(strPayload, second);
@@ -145,6 +193,13 @@ public class StatefulStreamJobUpgradeTestProgram {
 		};
 	}
 
+	private static JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> simpleUpgradedStateUpdate(String strPayload) {
+		return (UpgradedEvent first, ComplexPayload second) -> {
+			verifyState(strPayload, second);
+			return new ComplexPayload(first.event, strPayload);
+		};
+	}
+
 	private static JoinFunction<Event, ComplexPayload, ComplexPayload> lastStateUpdate(String strPayload) {
 		return (Event first, ComplexPayload second) -> {
 			verifyState(strPayload, second);
@@ -153,9 +208,45 @@ public class StatefulStreamJobUpgradeTestProgram {
 		};
 	}
 
+	private static JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> lastUpgradedStateUpdate(String strPayload) {
+		return (UpgradedEvent first, ComplexPayload second) -> {
+			verifyState(strPayload, second);
+			boolean isLastEvent = second != null && first.event.getEventTime() <= second.getEventTime();
+			return isLastEvent ? second : new ComplexPayload(first.event, strPayload);
+		};
+	}
+
 	private static void verifyState(String strPayload, ComplexPayload state) {
 		if (state != null && !state.getStrPayload().equals(strPayload)) {
 			System.out.println("State is set or restored incorrectly");
 		}
 	}
+
+	private static class UpgradeEvent implements MapFunction<Event, UpgradedEvent> {
+		@Override
+		public UpgradedEvent map(Event event) {
+			return new UpgradedEvent(event);
+		}
+	}
+
+	private static class UpgradedEvent {
+		public final Event event;
+		public final String randomPayload;
+
+		public UpgradedEvent(Event event) {
+			this.event = event;
+			this.randomPayload = event.getPayload().toUpperCase();
+		}
+
+		public int getKey() {
+			return event.getKey();
+		}
+	}
+
+	private static class DowngradeEvent implements MapFunction<UpgradedEvent, Event> {
+		@Override
+		public Event map(UpgradedEvent value) throws Exception {
+			return value.event;
+		}
+	}
 }