You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/08/02 16:55:47 UTC

[flink] branch master updated: [FLINK-13541][state-processor-api] State Processor Api sets the wrong key selector when writing savepoints

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 61352fb  [FLINK-13541][state-processor-api] State Processor Api sets the wrong key selector when writing savepoints
61352fb is described below

commit 61352fb69b24ea8c2de5e2c8840cabb3acc2202e
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Thu Aug 1 15:22:23 2019 -0500

    [FLINK-13541][state-processor-api] State Processor Api sets the wrong key selector when writing savepoints
    
    This closes #9324
---
 .../flink/state/api/BootstrapTransformation.java   | 46 ++++++++++++++--------
 .../state/api/BootstrapTransformationTest.java     | 35 ++++++++++++++++
 2 files changed, 64 insertions(+), 17 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
index a5163f3..5e2a7c2 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -68,7 +68,11 @@ public class BootstrapTransformation<T> {
 
 	/** Partitioner for the bootstrapping data set. Only relevant if this bootstraps partitioned state. */
 	@Nullable
-	private final HashSelector<T> keySelector;
+	private final KeySelector<T, ?> originalKeySelector;
+
+	/** Partitioner for distributing data by key group. Only relevant if this bootstraps partitioned state. */
+	@Nullable
+	private final HashSelector<T> hashKeySelector;
 
 	/** Type information for the key of the bootstrapped state. Only relevant if this bootstraps partitioned state. */
 	@Nullable
@@ -84,7 +88,8 @@ public class BootstrapTransformation<T> {
 		this.dataSet = dataSet;
 		this.operatorMaxParallelism = operatorMaxParallelism;
 		this.factory = factory;
-		this.keySelector = null;
+		this.originalKeySelector = null;
+		this.hashKeySelector = null;
 		this.keyType = null;
 	}
 
@@ -97,7 +102,8 @@ public class BootstrapTransformation<T> {
 		this.dataSet = dataSet;
 		this.operatorMaxParallelism = operatorMaxParallelism;
 		this.factory = factory;
-		this.keySelector = new HashSelector<>(keySelector);
+		this.originalKeySelector = keySelector;
+		this.hashKeySelector = new HashSelector<>(keySelector);
 		this.keyType = keyType;
 	}
 
@@ -135,16 +141,8 @@ public class BootstrapTransformation<T> {
 		int localMaxParallelism) {
 
 		DataSet<T> input = dataSet;
-		if (keySelector != null) {
-			input = dataSet.partitionCustom(new KeyGroupRangePartitioner(localMaxParallelism), keySelector);
-		}
-
-		final StreamConfig config;
-		if (keyType == null) {
-			config = new BoundedStreamConfig();
-		} else {
-			TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());
-			config = new BoundedStreamConfig(keySerializer, keySelector);
+		if (originalKeySelector != null) {
+			input = dataSet.partitionCustom(new KeyGroupRangePartitioner(localMaxParallelism), hashKeySelector);
 		}
 
 		StreamOperator<TaggedOperatorSubtaskState> operator = factory.createOperator(
@@ -152,11 +150,8 @@ public class BootstrapTransformation<T> {
 			savepointPath);
 
 		operator = dataSet.clean(operator);
-		config.setStreamOperator(operator);
 
-		config.setOperatorName(operatorID.toHexString());
-		config.setOperatorID(operatorID);
-		config.setStateBackend(stateBackend);
+		final StreamConfig config = getConfig(operatorID, stateBackend, operator);
 
 		BoundedOneInputStreamTaskRunner<T> operatorRunner = new BoundedOneInputStreamTaskRunner<>(
 			config,
@@ -178,6 +173,23 @@ public class BootstrapTransformation<T> {
 		return subtaskStates;
 	}
 
+	@VisibleForTesting
+	StreamConfig getConfig(OperatorID operatorID, StateBackend stateBackend, StreamOperator<TaggedOperatorSubtaskState> operator) {
+		final StreamConfig config;
+		if (keyType == null) {
+			config = new BoundedStreamConfig();
+		} else {
+			TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());
+			config = new BoundedStreamConfig(keySerializer, originalKeySelector);
+		}
+
+		config.setStreamOperator(operator);
+		config.setOperatorName(operatorID.toHexString());
+		config.setOperatorID(operatorID);
+		config.setStateBackend(stateBackend);
+		return config;
+	}
+
 	private static <T> int getParallelism(MapPartitionOperator<T, TaggedOperatorSubtaskState> subtaskStates) {
 		int parallelism = subtaskStates.getParallelism();
 		if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
index 64994c4..72cb218 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/BootstrapTransformationTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.state.api;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.core.fs.Path;
@@ -28,9 +29,11 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
+import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
 import org.apache.flink.state.api.functions.StateBootstrapFunction;
 import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
 import org.apache.flink.state.api.runtime.OperatorIDGenerator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.Assert;
@@ -39,6 +42,7 @@ import org.junit.Test;
 /**
  * Tests for bootstrap transformations.
  */
+@SuppressWarnings("serial")
 public class BootstrapTransformationTest extends AbstractTestBase {
 
 	@Test
@@ -138,6 +142,30 @@ public class BootstrapTransformationTest extends AbstractTestBase {
 		Assert.assertEquals("The parallelism of a data set should be constrained my the savepoint max parallelism", 1, getParallelism(result));
 	}
 
+	@Test
+	public void testStreamConfig() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSource<String> input = env.fromElements("");
+
+		BootstrapTransformation<String> transformation = OperatorTransformation
+			.bootstrapWith(input)
+			.keyBy(new CustomKeySelector())
+			.transform(new ExampleKeyedStateBootstrapFunction());
+
+		StreamConfig config = transformation.getConfig(OperatorIDGenerator.fromUid("uid"), new MemoryStateBackend(), null);
+		KeySelector selector = config.getStatePartitioner(0, Thread.currentThread().getContextClassLoader());
+
+		Assert.assertEquals("Incorrect key selector forwarded to stream operator", CustomKeySelector.class, selector.getClass());
+	}
+
+	private static class CustomKeySelector implements KeySelector<String, String> {
+
+		@Override
+		public String getKey(String value) throws Exception {
+			return value;
+		}
+	}
+
 	private static <T> int getParallelism(DataSet<T> dataSet) {
 		//All concrete implementations of DataSet are operators so this should always be safe.
 		return ((Operator) dataSet).getParallelism();
@@ -164,4 +192,11 @@ public class BootstrapTransformationTest extends AbstractTestBase {
 		public void initializeState(FunctionInitializationContext context) throws Exception {
 		}
 	}
+
+	private static class ExampleKeyedStateBootstrapFunction extends KeyedStateBootstrapFunction<String, String> {
+
+		@Override
+		public void processElement(String value, Context ctx) throws Exception {
+		}
+	}
 }