You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/07/30 09:44:10 UTC

[flink] branch release-1.6 updated (2a217a5 -> 10ab67f)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2a217a5  [FLINK-8974] Run all-round DataSet job with failures in HA mode
     new 1ff691e  [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction
     new 10ab67f  [FLINK-8994] [tests] Let general purpose DataStream job include Avro as state

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |  1 +
 .../flink-datastream-allround-test/pom.xml         | 34 ++++++++++++++++
 .../src/main/avro/ComplexPayloadAvro.avsc}         | 37 ++++++++++++++---
 .../tests/DataStreamAllroundTestJobFactory.java    | 45 ++++++++++++++++-----
 .../tests/DataStreamAllroundTestProgram.java       | 46 ++++++++++++++++++----
 .../builder/ArtificialListStateBuilder.java        | 13 +++---
 .../builder/ArtificialValueStateBuilder.java       | 12 +++---
 .../tests/StatefulStreamJobUpgradeTestProgram.java | 25 ++++++------
 .../test-scripts/test_resume_savepoint.sh          |  4 +-
 9 files changed, 168 insertions(+), 49 deletions(-)
 copy flink-end-to-end-tests/{flink-confluent-schema-registry/src/main/avro/user.avsc => flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc} (55%)


[flink] 02/02: [FLINK-8994] [tests] Let general purpose DataStream job include Avro as state

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10ab67f7c9644815c56d3d1729c25a8c1c7ef31c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Jul 27 13:04:26 2018 +0800

    [FLINK-8994] [tests] Let general purpose DataStream job include Avro as state
    
    This closes #6435.
---
 .gitignore                                         |  1 +
 .../flink-datastream-allround-test/pom.xml         | 34 ++++++++++++++
 .../src/main/avro/ComplexPayloadAvro.avsc          | 52 ++++++++++++++++++++++
 .../tests/DataStreamAllroundTestProgram.java       | 43 +++++++++++++++---
 .../test-scripts/test_resume_savepoint.sh          |  4 +-
 5 files changed, 126 insertions(+), 8 deletions(-)

diff --git a/.gitignore b/.gitignore
index 1fde2d9..20749c2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,7 @@ tmp
 *.log
 .DS_Store
 build-target
+flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/avro/
 flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/
 flink-runtime-web/web-dashboard/assets/fonts/
 flink-runtime-web/web-dashboard/node_modules/
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml b/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml
index 3ab8779..50990a3 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml
@@ -76,6 +76,40 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+			<plugin>
+				<groupId>org.apache.avro</groupId>
+				<artifactId>avro-maven-plugin</artifactId>
+				<version>${avro.version}</version>
+				<executions>
+					<execution>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>schema</goal>
+						</goals>
+						<configuration>
+							<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+							<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
+							<fieldVisibility>PRIVATE</fieldVisibility>
+							<includes>
+								<include>**/*.avsc</include>
+							</includes>
+							<!--
+							  This forces Avro to use Java Strings instead of Avro's Utf8.
+							  This is required since the job relies on equals checks on some String fields
+							  to verify that state restore was successful.
+							-->
+							<stringType>String</stringType>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<configuration>
+					<excludes>**/org/apache/flink/streaming/tests/avro/*</excludes>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
new file mode 100644
index 0000000..15a801e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/avro/ComplexPayloadAvro.avsc
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ {"namespace": "org.apache.flink.streaming.tests.avro",
+ "type": "record",
+ "name": "ComplexPayloadAvro",
+ "fields": [
+     {
+        "name": "eventTime",
+        "type": "long",
+        "default": ""
+     },
+     {
+        "name": "stringList",
+        "type": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        }
+     },
+     {
+        "name": "strPayload",
+        "type": "string",
+        "default": ""
+     },
+     {
+        "name": "innerPayLoad",
+        "type": {
+            "name": "InnerPayLoadAvro",
+            "type": "record",
+            "fields": [
+                {"name": "sequenceNumber", "type": "long"}
+            ]
+        }
+     }
+ ]
+}
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 30c1c24..513d0cf 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -22,14 +22,18 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.formats.avro.typeutils.AvroSerializer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
+import org.apache.flink.streaming.tests.avro.ComplexPayloadAvro;
+import org.apache.flink.streaming.tests.avro.InnerPayLoadAvro;
 import org.apache.flink.util.Collector;
 
+import java.util.Arrays;
 import java.util.Collections;
 
 import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.applyTumblingWindows;
@@ -72,6 +76,7 @@ public class DataStreamAllroundTestProgram {
 
 		setupEnvironment(env, pt);
 
+		// add a keyed stateful map operator, which uses Kryo for state serialization
 		DataStream<Event> eventStream = env.addSource(createEventSource(pt))
 			.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
 			.keyBy(Event::getKey)
@@ -79,19 +84,45 @@ public class DataStreamAllroundTestProgram {
 					// map function simply forwards the inputs
 					(MapFunction<Event, Event>) in -> in,
 					// state is verified and updated per event as a wrapped ComplexPayload state object
-					(Event first, ComplexPayload second) -> {
-							if (second != null && !second.getStrPayload().equals(KEYED_STATE_OPER_NAME)) {
+					(Event event, ComplexPayload lastState) -> {
+							if (lastState != null && !lastState.getStrPayload().equals(KEYED_STATE_OPER_NAME)
+									&& lastState.getInnerPayLoad().getSequenceNumber() == (event.getSequenceNumber() - 1)) {
 								System.out.println("State is set or restored incorrectly");
 							}
-							return new ComplexPayload(first, KEYED_STATE_OPER_NAME);
+							return new ComplexPayload(event, KEYED_STATE_OPER_NAME);
 						},
 					Collections.singletonList(
 						new KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom KryoSerializer
 					Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
 				)
-			)
-			.name(KEYED_STATE_OPER_NAME)
-			.returns(Event.class);
+			).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo");
+
+		// add a keyed stateful map operator, which uses Avro for state serialization
+		eventStream = eventStream
+			.keyBy(Event::getKey)
+			.map(createArtificialKeyedStateMapper(
+					// map function simply forwards the inputs
+					(MapFunction<Event, Event>) in -> in,
+					// state is verified and updated per event as a wrapped ComplexPayloadAvro state object
+					(Event event, ComplexPayloadAvro lastState) -> {
+							if (lastState != null && !lastState.getStrPayload().equals(KEYED_STATE_OPER_NAME)
+									&& lastState.getInnerPayLoad().getSequenceNumber() == (event.getSequenceNumber() - 1)) {
+								System.out.println("State is set or restored incorrectly");
+							}
+
+							ComplexPayloadAvro payload = new ComplexPayloadAvro();
+							payload.setEventTime(event.getEventTime());
+							payload.setInnerPayLoad(new InnerPayLoadAvro(event.getSequenceNumber()));
+							payload.setStrPayload(KEYED_STATE_OPER_NAME);
+							payload.setStringList(Arrays.asList(String.valueOf(event.getKey()), event.getPayload()));
+
+							return payload;
+						},
+					Collections.singletonList(
+						new AvroSerializer<>(ComplexPayloadAvro.class)), // custom AvroSerializer
+					Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type extraction
+				)
+			).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro");
 
 		DataStream<Event> eventStream2 = eventStream
 			.map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in))
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index fb8c51e..26a7e85 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -95,7 +95,7 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR
 
 wait_job_running $DATASTREAM_JOB
 
-wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
 # take a savepoint of the state machine job
 SAVEPOINT_PATH=$(take_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
@@ -120,7 +120,7 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TES
 
 wait_job_running $DATASTREAM_JOB
 
-wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
+wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
 
 # if state is errorneous and the state machine job produces alerting state transitions,
 # output would be non-empty and the test will not pass


[flink] 01/02: [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ff691e53cddef37364ec612cc56b5e99a358b6f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jul 25 14:58:46 2018 +0800

    [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction
    
    This closes #6413.
---
 .../tests/DataStreamAllroundTestJobFactory.java    | 45 +++++++++++++++++-----
 .../tests/DataStreamAllroundTestProgram.java       |  3 +-
 .../builder/ArtificialListStateBuilder.java        | 13 +++----
 .../builder/ArtificialValueStateBuilder.java       | 12 +++---
 .../tests/StatefulStreamJobUpgradeTestProgram.java | 25 ++++++------
 5 files changed, 63 insertions(+), 35 deletions(-)

diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 1238072..fb92960 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigOption;
@@ -382,13 +384,38 @@ class DataStreamAllroundTestJobFactory {
 	static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
 		MapFunction<IN, OUT> mapFunction,
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		List<TypeSerializer<STATE>> stateSerializers) {
+		List<TypeSerializer<STATE>> stateSerializers,
+		List<Class<STATE>> stateClasses) {
 
 		List<ArtificialStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
 		for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
-			artificialStateBuilders.add(createValueStateBuilder(inputAndOldStateToNewState, typeSerializer));
-			artificialStateBuilders.add(createListStateBuilder(inputAndOldStateToNewState, typeSerializer));
+			artificialStateBuilders.add(createValueStateBuilder(
+				inputAndOldStateToNewState,
+				new ValueStateDescriptor<>(
+					"valueState-" + typeSerializer.getClass().getSimpleName(),
+					typeSerializer)));
+
+			artificialStateBuilders.add(createListStateBuilder(
+				inputAndOldStateToNewState,
+				new ListStateDescriptor<>(
+					"listState-" + typeSerializer.getClass().getSimpleName(),
+					typeSerializer)));
 		}
+
+		for (Class<STATE> stateClass : stateClasses) {
+			artificialStateBuilders.add(createValueStateBuilder(
+				inputAndOldStateToNewState,
+				new ValueStateDescriptor<>(
+					"valueState-" + stateClass.getSimpleName(),
+					stateClass)));
+
+			artificialStateBuilders.add(createListStateBuilder(
+				inputAndOldStateToNewState,
+				new ListStateDescriptor<>(
+					"listState-" + stateClass.getSimpleName(),
+					stateClass)));
+		}
+
 		return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
 	}
 
@@ -400,17 +427,17 @@ class DataStreamAllroundTestJobFactory {
 
 	static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		TypeSerializer<STATE> typeSerializer) {
+		ValueStateDescriptor<STATE> valueStateDescriptor) {
 
 		return new ArtificialValueStateBuilder<>(
-			"valueState-" + typeSerializer.getClass().getSimpleName(),
+			valueStateDescriptor.getName(),
 			inputAndOldStateToNewState,
-			typeSerializer);
+			valueStateDescriptor);
 	}
 
 	static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		TypeSerializer<STATE> typeSerializer) {
+		ListStateDescriptor<STATE> listStateDescriptor) {
 
 		JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first, second) -> {
 			List<STATE> newState = new ArrayList<>();
@@ -421,9 +448,9 @@ class DataStreamAllroundTestJobFactory {
 		};
 
 		return new ArtificialListStateBuilder<>(
-			"listState-" + typeSerializer.getClass().getSimpleName(),
+			listStateDescriptor.getName(),
 			listStateGenerator,
 			listStateGenerator,
-			typeSerializer);
+			listStateDescriptor);
 	}
 }
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index ea90e65..30c1c24 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -86,7 +86,8 @@ public class DataStreamAllroundTestProgram {
 							return new ComplexPayload(first, KEYED_STATE_OPER_NAME);
 						},
 					Collections.singletonList(
-						new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+						new KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom KryoSerializer
+					Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
 				)
 			)
 			.name(KEYED_STATE_OPER_NAME)
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
index a2c6387..b29e535 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.tests.artificialstate.builder;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 
@@ -35,7 +35,7 @@ public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilde
 
 	private transient ListState<STATE> listOperatorState;
 	private transient ListState<STATE> listKeyedState;
-	private final TypeSerializer<STATE> typeSerializer;
+	private final ListStateDescriptor<STATE> listStateDescriptor;
 	private final JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator;
 	private final JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator;
 
@@ -43,11 +43,11 @@ public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilde
 		String stateName,
 		JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator,
 		JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator,
-		TypeSerializer<STATE> typeSerializer) {
+		ListStateDescriptor<STATE> listStateDescriptor) {
 		super(stateName);
-		this.typeSerializer = typeSerializer;
-		this.keyedStateGenerator = keyedStateGenerator;
-		this.operatorStateGenerator = operatorStateGenerator;
+		this.listStateDescriptor = Preconditions.checkNotNull(listStateDescriptor);
+		this.keyedStateGenerator = Preconditions.checkNotNull(keyedStateGenerator);
+		this.operatorStateGenerator = Preconditions.checkNotNull(operatorStateGenerator);
 	}
 
 	@Override
@@ -58,7 +58,6 @@ public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilde
 
 	@Override
 	public void initialize(FunctionInitializationContext initializationContext) throws Exception {
-		ListStateDescriptor<STATE> listStateDescriptor = new ListStateDescriptor<>(stateName, typeSerializer);
 		listOperatorState = initializationContext.getOperatorStateStore().getListState(listStateDescriptor);
 		listKeyedState = initializationContext.getKeyedStateStore().getListState(listStateDescriptor);
 	}
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
index 6d74e09..421a682 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.tests.artificialstate.builder;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
 
 /**
  * An {@link ArtificialStateBuilder} for user {@link ValueState}s.
@@ -32,16 +32,16 @@ public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialStateBuild
 	private static final long serialVersionUID = -1205814329756790916L;
 
 	private transient ValueState<STATE> valueState;
-	private final TypeSerializer<STATE> typeSerializer;
+	private final ValueStateDescriptor<STATE> valueStateDescriptor;
 	private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
 
 	public ArtificialValueStateBuilder(
 		String stateName,
 		JoinFunction<IN, STATE, STATE> stateValueGenerator,
-		TypeSerializer<STATE> typeSerializer) {
+		ValueStateDescriptor<STATE> valueStateDescriptor) {
 		super(stateName);
-		this.typeSerializer = typeSerializer;
-		this.stateValueGenerator = stateValueGenerator;
+		this.valueStateDescriptor = Preconditions.checkNotNull(valueStateDescriptor);
+		this.stateValueGenerator = Preconditions.checkNotNull(stateValueGenerator);
 	}
 
 	@Override
@@ -51,8 +51,6 @@ public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialStateBuild
 
 	@Override
 	public void initialize(FunctionInitializationContext initializationContext) {
-		ValueStateDescriptor<STATE> valueStateDescriptor =
-			new ValueStateDescriptor<>(stateName, typeSerializer);
 		valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
 	}
 }
diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index 0b3b5ed..4f77f95 100644
--- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -84,8 +84,8 @@ public class StatefulStreamJobUpgradeTestProgram {
 			Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
 
 		KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
-			applyOriginalStatefulOperations(source, stateSer) :
-			applyUpgradedStatefulOperations(source, stateSer);
+			applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
+			applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
 
 		afterStatefulOperations
 			.flatMap(createSemanticsCheckMapper(pt))
@@ -109,26 +109,29 @@ public class StatefulStreamJobUpgradeTestProgram {
 
 	private static KeyedStream<Event, Integer> applyOriginalStatefulOperations(
 		KeyedStream<Event, Integer> source,
-		List<TypeSerializer<ComplexPayload>> stateSer) {
-		source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer);
-		return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer);
+		List<TypeSerializer<ComplexPayload>> stateSer,
+		List<Class<ComplexPayload>> stateClass) {
+		source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer, stateClass);
+		return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer, stateClass);
 	}
 
 	private static KeyedStream<Event, Integer> applyUpgradedStatefulOperations(
 		KeyedStream<Event, Integer> source,
-		List<TypeSerializer<ComplexPayload>> stateSer) {
-		source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer);
-		source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer);
-		return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer);
+		List<TypeSerializer<ComplexPayload>> stateSer,
+		List<Class<ComplexPayload>> stateClass) {
+		source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer, stateClass);
+		source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer, stateClass);
+		return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer, stateClass);
 	}
 
 	private static KeyedStream<Event, Integer> applyTestStatefulOperator(
 		String name,
 		JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
 		KeyedStream<Event, Integer> source,
-		List<TypeSerializer<ComplexPayload>> stateSer) {
+		List<TypeSerializer<ComplexPayload>> stateSer,
+		List<Class<ComplexPayload>> stateClass) {
 		return source
-			.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer))
+			.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
 			.name(name)
 			.uid(name)
 			.returns(Event.class)