You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/08/02 16:55:47 UTC
[flink] branch master updated: [FLINK-13541][state-processor-api]
State Processor Api sets the wrong key selector when writing savepoints
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 61352fb [FLINK-13541][state-processor-api] State Processor Api sets the wrong key selector when writing savepoints
61352fb is described below
commit 61352fb69b24ea8c2de5e2c8840cabb3acc2202e
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Thu Aug 1 15:22:23 2019 -0500
[FLINK-13541][state-processor-api] State Processor Api sets the wrong key selector when writing savepoints
This closes #9324
---
.../flink/state/api/BootstrapTransformation.java | 46 ++++++++++++++--------
.../state/api/BootstrapTransformationTest.java | 35 ++++++++++++++++
2 files changed, 64 insertions(+), 17 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
index a5163f3..5e2a7c2 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -68,7 +68,11 @@ public class BootstrapTransformation<T> {
/** Partitioner for the bootstrapping data set. Only relevant if this bootstraps partitioned state. */
@Nullable
- private final HashSelector<T> keySelector;
+ private final KeySelector<T, ?> originalKeySelector;
+
+ /** Partitioner for distributing data by key group. Only relevant if this bootstraps partitioned state. */
+ @Nullable
+ private final HashSelector<T> hashKeySelector;
/** Type information for the key of the bootstrapped state. Only relevant if this bootstraps partitioned state. */
@Nullable
@@ -84,7 +88,8 @@ public class BootstrapTransformation<T> {
this.dataSet = dataSet;
this.operatorMaxParallelism = operatorMaxParallelism;
this.factory = factory;
- this.keySelector = null;
+ this.originalKeySelector = null;
+ this.hashKeySelector = null;
this.keyType = null;
}
@@ -97,7 +102,8 @@ public class BootstrapTransformation<T> {
this.dataSet = dataSet;
this.operatorMaxParallelism = operatorMaxParallelism;
this.factory = factory;
- this.keySelector = new HashSelector<>(keySelector);
+ this.originalKeySelector = keySelector;
+ this.hashKeySelector = new HashSelector<>(keySelector);
this.keyType = keyType;
}
@@ -135,16 +141,8 @@ public class BootstrapTransformation<T> {
int localMaxParallelism) {
DataSet<T> input = dataSet;
- if (keySelector != null) {
- input = dataSet.partitionCustom(new KeyGroupRangePartitioner(localMaxParallelism), keySelector);
- }
-
- final StreamConfig config;
- if (keyType == null) {
- config = new BoundedStreamConfig();
- } else {
- TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());
- config = new BoundedStreamConfig(keySerializer, keySelector);
+ if (originalKeySelector != null) {
+ input = dataSet.partitionCustom(new KeyGroupRangePartitioner(localMaxParallelism), hashKeySelector);
}
StreamOperator<TaggedOperatorSubtaskState> operator = factory.createOperator(
@@ -152,11 +150,8 @@ public class BootstrapTransformation<T> {
savepointPath);
operator = dataSet.clean(operator);
- config.setStreamOperator(operator);
- config.setOperatorName(operatorID.toHexString());
- config.setOperatorID(operatorID);
- config.setStateBackend(stateBackend);
+ final StreamConfig config = getConfig(operatorID, stateBackend, operator);
BoundedOneInputStreamTaskRunner<T> operatorRunner = new BoundedOneInputStreamTaskRunner<>(
config,
@@ -178,6 +173,23 @@ public class BootstrapTransformation<T> {
return subtaskStates;
}
+ @VisibleForTesting
+ StreamConfig getConfig(OperatorID operatorID, StateBackend stateBackend, StreamOperator<TaggedOperatorSubtaskState> operator) {
+ final StreamConfig config;
+ if (keyType == null) {
+ config = new BoundedStreamConfig();
+ } else {
+ TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());
+ config = new BoundedStreamConfig(keySerializer, originalKeySelector);
+ }
+
+ config.setStreamOperator(operator);
+ config.setOperatorName(operatorID.toHexString());
+ config.setOperatorID(operatorID);
+ config.setStateBackend(stateBackend);
+ return config;
+ }
+
private static <T> int getParallelism(MapPartitionOperator<T, TaggedOperatorSubtaskState> subtaskStates) {
int parallelism = subtaskStates.getParallelism();
if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
index 64994c4..72cb218 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.state.api;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.core.fs.Path;
@@ -28,9 +29,11 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.StateBootstrapFunction;
import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
@@ -39,6 +42,7 @@ import org.junit.Test;
/**
* Tests for bootstrap transformations.
*/
+@SuppressWarnings("serial")
public class BootstrapTransformationTest extends AbstractTestBase {
@Test
@@ -138,6 +142,30 @@ public class BootstrapTransformationTest extends AbstractTestBase {
Assert.assertEquals("The parallelism of a data set should be constrained my the savepoint max parallelism", 1, getParallelism(result));
}
+ @Test
+ public void testStreamConfig() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSource<String> input = env.fromElements("");
+
+ BootstrapTransformation<String> transformation = OperatorTransformation
+ .bootstrapWith(input)
+ .keyBy(new CustomKeySelector())
+ .transform(new ExampleKeyedStateBootstrapFunction());
+
+ StreamConfig config = transformation.getConfig(OperatorIDGenerator.fromUid("uid"), new MemoryStateBackend(), null);
+ KeySelector selector = config.getStatePartitioner(0, Thread.currentThread().getContextClassLoader());
+
+ Assert.assertEquals("Incorrect key selector forwarded to stream operator", CustomKeySelector.class, selector.getClass());
+ }
+
+ private static class CustomKeySelector implements KeySelector<String, String> {
+
+ @Override
+ public String getKey(String value) throws Exception {
+ return value;
+ }
+ }
+
private static <T> int getParallelism(DataSet<T> dataSet) {
//All concrete implementations of DataSet are operators so this should always be safe.
return ((Operator) dataSet).getParallelism();
@@ -164,4 +192,11 @@ public class BootstrapTransformationTest extends AbstractTestBase {
public void initializeState(FunctionInitializationContext context) throws Exception {
}
}
+
+ private static class ExampleKeyedStateBootstrapFunction extends KeyedStateBootstrapFunction<String, String> {
+
+ @Override
+ public void processElement(String value, Context ctx) throws Exception {
+ }
+ }
}