You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/07/30 09:42:41 UTC

[flink] 01/02: [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d6040b1f44e3884b97e8eaef063c25a663c52cc
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jul 25 14:58:46 2018 +0800

    [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction
    
    This closes #6413.
---
 .../tests/DataStreamAllroundTestJobFactory.java    | 45 +++++++++++++++++-----
 .../tests/DataStreamAllroundTestProgram.java       |  3 +-
 .../builder/ArtificialListStateBuilder.java        | 13 +++----
 .../builder/ArtificialValueStateBuilder.java       | 12 +++---
 .../tests/StatefulStreamJobUpgradeTestProgram.java | 25 ++++++------
 5 files changed, 63 insertions(+), 35 deletions(-)

diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 1238072..fb92960 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigOption;
@@ -382,13 +384,38 @@ class DataStreamAllroundTestJobFactory {
 	static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
 		MapFunction<IN, OUT> mapFunction,
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		List<TypeSerializer<STATE>> stateSerializers) {
+		List<TypeSerializer<STATE>> stateSerializers,
+		List<Class<STATE>> stateClasses) {
 
 		List<ArtificialStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
 		for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
-			artificialStateBuilders.add(createValueStateBuilder(inputAndOldStateToNewState, typeSerializer));
-			artificialStateBuilders.add(createListStateBuilder(inputAndOldStateToNewState, typeSerializer));
+			artificialStateBuilders.add(createValueStateBuilder(
+				inputAndOldStateToNewState,
+				new ValueStateDescriptor<>(
+					"valueState-" + typeSerializer.getClass().getSimpleName(),
+					typeSerializer)));
+
+			artificialStateBuilders.add(createListStateBuilder(
+				inputAndOldStateToNewState,
+				new ListStateDescriptor<>(
+					"listState-" + typeSerializer.getClass().getSimpleName(),
+					typeSerializer)));
 		}
+
+		for (Class<STATE> stateClass : stateClasses) {
+			artificialStateBuilders.add(createValueStateBuilder(
+				inputAndOldStateToNewState,
+				new ValueStateDescriptor<>(
+					"valueState-" + stateClass.getSimpleName(),
+					stateClass)));
+
+			artificialStateBuilders.add(createListStateBuilder(
+				inputAndOldStateToNewState,
+				new ListStateDescriptor<>(
+					"listState-" + stateClass.getSimpleName(),
+					stateClass)));
+		}
+
 		return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
 	}
 
@@ -400,17 +427,17 @@ class DataStreamAllroundTestJobFactory {
 
 	static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		TypeSerializer<STATE> typeSerializer) {
+		ValueStateDescriptor<STATE> valueStateDescriptor) {
 
 		return new ArtificialValueStateBuilder<>(
-			"valueState-" + typeSerializer.getClass().getSimpleName(),
+			valueStateDescriptor.getName(),
 			inputAndOldStateToNewState,
-			typeSerializer);
+			valueStateDescriptor);
 	}
 
 	static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		TypeSerializer<STATE> typeSerializer) {
+		ListStateDescriptor<STATE> listStateDescriptor) {
 
 		JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first, second) -> {
 			List<STATE> newState = new ArrayList<>();
@@ -421,9 +448,9 @@ class DataStreamAllroundTestJobFactory {
 		};
 
 		return new ArtificialListStateBuilder<>(
-			"listState-" + typeSerializer.getClass().getSimpleName(),
+			listStateDescriptor.getName(),
 			listStateGenerator,
 			listStateGenerator,
-			typeSerializer);
+			listStateDescriptor);
 	}
 }
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index ea90e65..30c1c24 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -86,7 +86,8 @@ public class DataStreamAllroundTestProgram {
 							return new ComplexPayload(first, KEYED_STATE_OPER_NAME);
 						},
 					Collections.singletonList(
-						new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+						new KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom KryoSerializer
+					Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
 				)
 			)
 			.name(KEYED_STATE_OPER_NAME)
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
index a2c6387..b29e535 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.tests.artificialstate.builder;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 
@@ -35,7 +35,7 @@ public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilde
 
 	private transient ListState<STATE> listOperatorState;
 	private transient ListState<STATE> listKeyedState;
-	private final TypeSerializer<STATE> typeSerializer;
+	private final ListStateDescriptor<STATE> listStateDescriptor;
 	private final JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator;
 	private final JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator;
 
@@ -43,11 +43,11 @@ public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilde
 		String stateName,
 		JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator,
 		JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator,
-		TypeSerializer<STATE> typeSerializer) {
+		ListStateDescriptor<STATE> listStateDescriptor) {
 		super(stateName);
-		this.typeSerializer = typeSerializer;
-		this.keyedStateGenerator = keyedStateGenerator;
-		this.operatorStateGenerator = operatorStateGenerator;
+		this.listStateDescriptor = Preconditions.checkNotNull(listStateDescriptor);
+		this.keyedStateGenerator = Preconditions.checkNotNull(keyedStateGenerator);
+		this.operatorStateGenerator = Preconditions.checkNotNull(operatorStateGenerator);
 	}
 
 	@Override
@@ -58,7 +58,6 @@ public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilde
 
 	@Override
 	public void initialize(FunctionInitializationContext initializationContext) throws Exception {
-		ListStateDescriptor<STATE> listStateDescriptor = new ListStateDescriptor<>(stateName, typeSerializer);
 		listOperatorState = initializationContext.getOperatorStateStore().getListState(listStateDescriptor);
 		listKeyedState = initializationContext.getKeyedStateStore().getListState(listStateDescriptor);
 	}
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
index 6d74e09..421a682 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.tests.artificialstate.builder;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
 
 /**
  * An {@link ArtificialStateBuilder} for user {@link ValueState}s.
@@ -32,16 +32,16 @@ public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialStateBuild
 	private static final long serialVersionUID = -1205814329756790916L;
 
 	private transient ValueState<STATE> valueState;
-	private final TypeSerializer<STATE> typeSerializer;
+	private final ValueStateDescriptor<STATE> valueStateDescriptor;
 	private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
 
 	public ArtificialValueStateBuilder(
 		String stateName,
 		JoinFunction<IN, STATE, STATE> stateValueGenerator,
-		TypeSerializer<STATE> typeSerializer) {
+		ValueStateDescriptor<STATE> valueStateDescriptor) {
 		super(stateName);
-		this.typeSerializer = typeSerializer;
-		this.stateValueGenerator = stateValueGenerator;
+		this.valueStateDescriptor = Preconditions.checkNotNull(valueStateDescriptor);
+		this.stateValueGenerator = Preconditions.checkNotNull(stateValueGenerator);
 	}
 
 	@Override
@@ -51,8 +51,6 @@ public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialStateBuild
 
 	@Override
 	public void initialize(FunctionInitializationContext initializationContext) {
-		ValueStateDescriptor<STATE> valueStateDescriptor =
-			new ValueStateDescriptor<>(stateName, typeSerializer);
 		valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
 	}
 }
diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index 0b3b5ed..4f77f95 100644
--- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -84,8 +84,8 @@ public class StatefulStreamJobUpgradeTestProgram {
 			Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
 
 		KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
-			applyOriginalStatefulOperations(source, stateSer) :
-			applyUpgradedStatefulOperations(source, stateSer);
+			applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
+			applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
 
 		afterStatefulOperations
 			.flatMap(createSemanticsCheckMapper(pt))
@@ -109,26 +109,29 @@ public class StatefulStreamJobUpgradeTestProgram {
 
 	private static KeyedStream<Event, Integer> applyOriginalStatefulOperations(
 		KeyedStream<Event, Integer> source,
-		List<TypeSerializer<ComplexPayload>> stateSer) {
-		source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer);
-		return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer);
+		List<TypeSerializer<ComplexPayload>> stateSer,
+		List<Class<ComplexPayload>> stateClass) {
+		source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer, stateClass);
+		return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer, stateClass);
 	}
 
 	private static KeyedStream<Event, Integer> applyUpgradedStatefulOperations(
 		KeyedStream<Event, Integer> source,
-		List<TypeSerializer<ComplexPayload>> stateSer) {
-		source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer);
-		source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer);
-		return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer);
+		List<TypeSerializer<ComplexPayload>> stateSer,
+		List<Class<ComplexPayload>> stateClass) {
+		source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer, stateClass);
+		source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer, stateClass);
+		return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer, stateClass);
 	}
 
 	private static KeyedStream<Event, Integer> applyTestStatefulOperator(
 		String name,
 		JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
 		KeyedStream<Event, Integer> source,
-		List<TypeSerializer<ComplexPayload>> stateSer) {
+		List<TypeSerializer<ComplexPayload>> stateSer,
+		List<Class<ComplexPayload>> stateClass) {
 		return source
-			.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer))
+			.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
 			.name(name)
 			.uid(name)
 			.returns(Event.class)