You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/07/30 09:42:41 UTC
[flink] 01/02: [FLINK-8993] [tests] Let general purpose DataStream
job uses KryoSerializer via type extraction
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0d6040b1f44e3884b97e8eaef063c25a663c52cc
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jul 25 14:58:46 2018 +0800
[FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction
This closes #6413.
---
.../tests/DataStreamAllroundTestJobFactory.java | 45 +++++++++++++++++-----
.../tests/DataStreamAllroundTestProgram.java | 3 +-
.../builder/ArtificialListStateBuilder.java | 13 +++----
.../builder/ArtificialValueStateBuilder.java | 12 +++---
.../tests/StatefulStreamJobUpgradeTestProgram.java | 25 ++++++------
5 files changed, 63 insertions(+), 35 deletions(-)
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 1238072..fb92960 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
@@ -382,13 +384,38 @@ class DataStreamAllroundTestJobFactory {
static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
MapFunction<IN, OUT> mapFunction,
JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
- List<TypeSerializer<STATE>> stateSerializers) {
+ List<TypeSerializer<STATE>> stateSerializers,
+ List<Class<STATE>> stateClasses) {
List<ArtificialStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
- artificialStateBuilders.add(createValueStateBuilder(inputAndOldStateToNewState, typeSerializer));
- artificialStateBuilders.add(createListStateBuilder(inputAndOldStateToNewState, typeSerializer));
+ artificialStateBuilders.add(createValueStateBuilder(
+ inputAndOldStateToNewState,
+ new ValueStateDescriptor<>(
+ "valueState-" + typeSerializer.getClass().getSimpleName(),
+ typeSerializer)));
+
+ artificialStateBuilders.add(createListStateBuilder(
+ inputAndOldStateToNewState,
+ new ListStateDescriptor<>(
+ "listState-" + typeSerializer.getClass().getSimpleName(),
+ typeSerializer)));
}
+
+ for (Class<STATE> stateClass : stateClasses) {
+ artificialStateBuilders.add(createValueStateBuilder(
+ inputAndOldStateToNewState,
+ new ValueStateDescriptor<>(
+ "valueState-" + stateClass.getSimpleName(),
+ stateClass)));
+
+ artificialStateBuilders.add(createListStateBuilder(
+ inputAndOldStateToNewState,
+ new ListStateDescriptor<>(
+ "listState-" + stateClass.getSimpleName(),
+ stateClass)));
+ }
+
return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
}
@@ -400,17 +427,17 @@ class DataStreamAllroundTestJobFactory {
static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
- TypeSerializer<STATE> typeSerializer) {
+ ValueStateDescriptor<STATE> valueStateDescriptor) {
return new ArtificialValueStateBuilder<>(
- "valueState-" + typeSerializer.getClass().getSimpleName(),
+ valueStateDescriptor.getName(),
inputAndOldStateToNewState,
- typeSerializer);
+ valueStateDescriptor);
}
static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
- TypeSerializer<STATE> typeSerializer) {
+ ListStateDescriptor<STATE> listStateDescriptor) {
JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first, second) -> {
List<STATE> newState = new ArrayList<>();
@@ -421,9 +448,9 @@ class DataStreamAllroundTestJobFactory {
};
return new ArtificialListStateBuilder<>(
- "listState-" + typeSerializer.getClass().getSimpleName(),
+ listStateDescriptor.getName(),
listStateGenerator,
listStateGenerator,
- typeSerializer);
+ listStateDescriptor);
}
}
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index ea90e65..30c1c24 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -86,7 +86,8 @@ public class DataStreamAllroundTestProgram {
return new ComplexPayload(first, KEYED_STATE_OPER_NAME);
},
Collections.singletonList(
- new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+ new KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom KryoSerializer
+ Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
)
)
.name(KEYED_STATE_OPER_NAME)
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
index a2c6387..b29e535 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.tests.artificialstate.builder;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
import java.util.List;
@@ -35,7 +35,7 @@ public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilde
private transient ListState<STATE> listOperatorState;
private transient ListState<STATE> listKeyedState;
- private final TypeSerializer<STATE> typeSerializer;
+ private final ListStateDescriptor<STATE> listStateDescriptor;
private final JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator;
private final JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator;
@@ -43,11 +43,11 @@ public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilde
String stateName,
JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator,
JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator,
- TypeSerializer<STATE> typeSerializer) {
+ ListStateDescriptor<STATE> listStateDescriptor) {
super(stateName);
- this.typeSerializer = typeSerializer;
- this.keyedStateGenerator = keyedStateGenerator;
- this.operatorStateGenerator = operatorStateGenerator;
+ this.listStateDescriptor = Preconditions.checkNotNull(listStateDescriptor);
+ this.keyedStateGenerator = Preconditions.checkNotNull(keyedStateGenerator);
+ this.operatorStateGenerator = Preconditions.checkNotNull(operatorStateGenerator);
}
@Override
@@ -58,7 +58,6 @@ public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilde
@Override
public void initialize(FunctionInitializationContext initializationContext) throws Exception {
- ListStateDescriptor<STATE> listStateDescriptor = new ListStateDescriptor<>(stateName, typeSerializer);
listOperatorState = initializationContext.getOperatorStateStore().getListState(listStateDescriptor);
listKeyedState = initializationContext.getKeyedStateStore().getListState(listStateDescriptor);
}
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
index 6d74e09..421a682 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.tests.artificialstate.builder;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
/**
* An {@link ArtificialStateBuilder} for user {@link ValueState}s.
@@ -32,16 +32,16 @@ public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialStateBuild
private static final long serialVersionUID = -1205814329756790916L;
private transient ValueState<STATE> valueState;
- private final TypeSerializer<STATE> typeSerializer;
+ private final ValueStateDescriptor<STATE> valueStateDescriptor;
private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
public ArtificialValueStateBuilder(
String stateName,
JoinFunction<IN, STATE, STATE> stateValueGenerator,
- TypeSerializer<STATE> typeSerializer) {
+ ValueStateDescriptor<STATE> valueStateDescriptor) {
super(stateName);
- this.typeSerializer = typeSerializer;
- this.stateValueGenerator = stateValueGenerator;
+ this.valueStateDescriptor = Preconditions.checkNotNull(valueStateDescriptor);
+ this.stateValueGenerator = Preconditions.checkNotNull(stateValueGenerator);
}
@Override
@@ -51,8 +51,6 @@ public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialStateBuild
@Override
public void initialize(FunctionInitializationContext initializationContext) {
- ValueStateDescriptor<STATE> valueStateDescriptor =
- new ValueStateDescriptor<>(stateName, typeSerializer);
valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
}
}
diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index 0b3b5ed..4f77f95 100644
--- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -84,8 +84,8 @@ public class StatefulStreamJobUpgradeTestProgram {
Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
- applyOriginalStatefulOperations(source, stateSer) :
- applyUpgradedStatefulOperations(source, stateSer);
+ applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
+ applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
afterStatefulOperations
.flatMap(createSemanticsCheckMapper(pt))
@@ -109,26 +109,29 @@ public class StatefulStreamJobUpgradeTestProgram {
private static KeyedStream<Event, Integer> applyOriginalStatefulOperations(
KeyedStream<Event, Integer> source,
- List<TypeSerializer<ComplexPayload>> stateSer) {
- source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer);
- return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer);
+ List<TypeSerializer<ComplexPayload>> stateSer,
+ List<Class<ComplexPayload>> stateClass) {
+ source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer, stateClass);
+ return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer, stateClass);
}
private static KeyedStream<Event, Integer> applyUpgradedStatefulOperations(
KeyedStream<Event, Integer> source,
- List<TypeSerializer<ComplexPayload>> stateSer) {
- source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer);
- source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer);
- return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer);
+ List<TypeSerializer<ComplexPayload>> stateSer,
+ List<Class<ComplexPayload>> stateClass) {
+ source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer, stateClass);
+ source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer, stateClass);
+ return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer, stateClass);
}
private static KeyedStream<Event, Integer> applyTestStatefulOperator(
String name,
JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
KeyedStream<Event, Integer> source,
- List<TypeSerializer<ComplexPayload>> stateSer) {
+ List<TypeSerializer<ComplexPayload>> stateSer,
+ List<Class<ComplexPayload>> stateClass) {
return source
- .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer))
+ .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
.name(name)
.uid(name)
.returns(Event.class)