You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/12/11 09:47:54 UTC

[flink] 09/15: [FLINK-11087] [state] Incorrect broadcast state K/V serializer snapshot association when restoring from 1.5.x

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8b898cd342faa9efaa14f436bedaee11e6389252
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Dec 7 00:14:13 2018 +0800

    [FLINK-11087] [state] Incorrect broadcast state K/V serializer snapshot association when restoring from 1.5.x
    
    When restoring a broadcast state's meta information from a 1.5.x
    savepoint, the
    LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV2V3
    incorrectly associates the first restored serializer as the value
    serializer, and the second restored serializer as the key serializer.
    The actual order of this should be the other way around.
    
    This bug prevents successful broadcast state restores from 1.5, both for
    Flink 1.6.x and 1.7.0.
    
    The commit also modifies the StatefulJobWBroadcastStateMigrationITCase
    to have different key / value types for its tested broadcast tests,
    which otherwise would not have caught this bug.
---
 .../state/metainfo/LegacyStateMetaInfoReaders.java |  33 ++++++-----
 .../StatefulJobWBroadcastStateMigrationITCase.java |  66 ++++++++++-----------
 .../_metadata                                      | Bin 20936 -> 21096 bytes
 .../_metadata                                      | Bin 20936 -> 21096 bytes
 .../_metadata                                      | Bin 22318 -> 22478 bytes
 .../_metadata                                      | Bin 42566 -> 42726 bytes
 6 files changed, 51 insertions(+), 48 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
index 77c267a..836edef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java
@@ -132,11 +132,6 @@ public class LegacyStateMetaInfoReaders {
 
 		static final OperatorBackendStateMetaInfoReaderV2V3 INSTANCE = new OperatorBackendStateMetaInfoReaderV2V3();
 
-		private static final String[] ORDERED_KEY_STRINGS =
-			new String[]{
-				StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(),
-				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()};
-
 		@Nonnull
 		@Override
 		public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@@ -156,17 +151,25 @@ public class LegacyStateMetaInfoReaders {
 			final int listSize = stateSerializerAndConfigList.size();
 			StateMetaInfoSnapshot.BackendStateType stateType = listSize == 1 ?
 				StateMetaInfoSnapshot.BackendStateType.OPERATOR : StateMetaInfoSnapshot.BackendStateType.BROADCAST;
-			Map<String, TypeSerializerSnapshot<?>> serializerConfigsMap = new HashMap<>(listSize);
-			for (int i = 0; i < listSize; ++i) {
-				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> serializerAndConf =
-					stateSerializerAndConfigList.get(i);
-
-				// this particular mapping happens to support both, V2 and V3
-				String serializerKey = ORDERED_KEY_STRINGS[ORDERED_KEY_STRINGS.length - 1 - i];
 
-				serializerConfigsMap.put(
-					serializerKey,
-					serializerAndConf.f1);
+			Map<String, TypeSerializerSnapshot<?>> serializerConfigsMap = new HashMap<>(listSize);
+			switch (stateType) {
+				case OPERATOR:
+					serializerConfigsMap.put(
+						StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+						stateSerializerAndConfigList.get(0).f1);
+					break;
+				case BROADCAST:
+					serializerConfigsMap.put(
+						StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(),
+						stateSerializerAndConfigList.get(0).f1);
+
+					serializerConfigsMap.put(
+						StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+						stateSerializerAndConfigList.get(1).f1);
+					break;
+				default:
+					throw new IllegalStateException("Unknown operator state type " + stateType);
 			}
 
 			return new StateMetaInfoSnapshot(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
index f792362..3f49d84 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
@@ -113,17 +113,17 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 		expectedFirstState.put(2L, 2L);
 		expectedFirstState.put(3L, 3L);
 
-		final Map<String, String> expectedSecondState = new HashMap<>();
-		expectedSecondState.put("0", "0");
-		expectedSecondState.put("1", "1");
-		expectedSecondState.put("2", "2");
-		expectedSecondState.put("3", "3");
-
-		final Map<String, String> expectedThirdState = new HashMap<>();
-		expectedThirdState.put("0", "0");
-		expectedThirdState.put("1", "1");
-		expectedThirdState.put("2", "2");
-		expectedThirdState.put("3", "3");
+		final Map<String, Long> expectedSecondState = new HashMap<>();
+		expectedSecondState.put("0", 0L);
+		expectedSecondState.put("1", 1L);
+		expectedSecondState.put("2", 2L);
+		expectedSecondState.put("3", 3L);
+
+		final Map<Long, String> expectedThirdState = new HashMap<>();
+		expectedThirdState.put(0L, "0");
+		expectedThirdState.put(1L, "1");
+		expectedThirdState.put(2L, "2");
+		expectedThirdState.put(3L, "3");
 
 		if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
 			nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
@@ -171,12 +171,12 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 				"broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
 		);
 
-		final MapStateDescriptor<String, String> secondBroadcastStateDesc = new MapStateDescriptor<>(
-				"broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+		final MapStateDescriptor<String, Long> secondBroadcastStateDesc = new MapStateDescriptor<>(
+				"broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
 		);
 
-		final MapStateDescriptor<String, String> thirdBroadcastStateDesc = new MapStateDescriptor<>(
-				"broadcast-state-3", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+		final MapStateDescriptor<Long, String> thirdBroadcastStateDesc = new MapStateDescriptor<>(
+				"broadcast-state-3", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
 		);
 
 		BroadcastStream<Tuple2<Long, Long>> npBroadcastStream = env
@@ -234,7 +234,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 
 		private MapStateDescriptor<Long, Long> firstStateDesc;
 
-		private MapStateDescriptor<String, String> secondStateDesc;
+		private MapStateDescriptor<String, Long> secondStateDesc;
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
@@ -245,7 +245,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 			);
 
 			secondStateDesc = new MapStateDescriptor<>(
-					"broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+					"broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
 			);
 		}
 
@@ -257,7 +257,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 		@Override
 		public void processBroadcastElement(Tuple2<Long, Long> value, Context ctx, Collector<Tuple2<Long, Long>> out) throws Exception {
 			ctx.getBroadcastState(firstStateDesc).put(value.f0, value.f1);
-			ctx.getBroadcastState(secondStateDesc).put(Long.toString(value.f0), Long.toString(value.f1));
+			ctx.getBroadcastState(secondStateDesc).put(Long.toString(value.f0), value.f1);
 		}
 	}
 
@@ -269,14 +269,14 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 
 		private static final long serialVersionUID = 1333992081671604521L;
 
-		private MapStateDescriptor<String, String> stateDesc;
+		private MapStateDescriptor<Long, String> stateDesc;
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
 			super.open(parameters);
 
 			stateDesc = new MapStateDescriptor<>(
-					"broadcast-state-3", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+					"broadcast-state-3", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
 			);
 		}
 
@@ -287,7 +287,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 
 		@Override
 		public void processBroadcastElement(Tuple2<Long, Long> value, Context ctx, Collector<Tuple2<Long, Long>> out) throws Exception {
-			ctx.getBroadcastState(stateDesc).put(Long.toString(value.f0), Long.toString(value.f1));
+			ctx.getBroadcastState(stateDesc).put(value.f0, Long.toString(value.f1));
 		}
 	}
 
@@ -301,13 +301,13 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 
 		private final Map<Long, Long> expectedFirstState;
 
-		private final Map<String, String> expectedSecondState;
+		private final Map<String, Long> expectedSecondState;
 
 		private MapStateDescriptor<Long, Long> firstStateDesc;
 
-		private MapStateDescriptor<String, String> secondStateDesc;
+		private MapStateDescriptor<String, Long> secondStateDesc;
 
-		CheckingKeyedBroadcastFunction(Map<Long, Long> firstState, Map<String, String> secondState) {
+		CheckingKeyedBroadcastFunction(Map<Long, Long> firstState, Map<String, Long> secondState) {
 			this.expectedFirstState = firstState;
 			this.expectedSecondState = secondState;
 		}
@@ -321,7 +321,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 			);
 
 			secondStateDesc = new MapStateDescriptor<>(
-					"broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+					"broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO
 			);
 		}
 
@@ -334,8 +334,8 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 			}
 			Assert.assertEquals(expectedFirstState, actualFirstState);
 
-			final Map<String, String> actualSecondState = new HashMap<>();
-			for (Map.Entry<String, String> entry: ctx.getBroadcastState(secondStateDesc).immutableEntries()) {
+			final Map<String, Long> actualSecondState = new HashMap<>();
+			for (Map.Entry<String, Long> entry: ctx.getBroadcastState(secondStateDesc).immutableEntries()) {
 				actualSecondState.put(entry.getKey(), entry.getValue());
 			}
 			Assert.assertEquals(expectedSecondState, actualSecondState);
@@ -357,11 +357,11 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 
 		private static final long serialVersionUID = 1333992081671604521L;
 
-		private final Map<String, String> expectedState;
+		private final Map<Long, String> expectedState;
 
-		private MapStateDescriptor<String, String> stateDesc;
+		private MapStateDescriptor<Long, String> stateDesc;
 
-		CheckingKeyedSingleBroadcastFunction(Map<String, String> state) {
+		CheckingKeyedSingleBroadcastFunction(Map<Long, String> state) {
 			this.expectedState = state;
 		}
 
@@ -370,14 +370,14 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
 			super.open(parameters);
 
 			stateDesc = new MapStateDescriptor<>(
-					"broadcast-state-3", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+					"broadcast-state-3", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
 			);
 		}
 
 		@Override
 		public void processElement(Tuple2<Long, Long> value, ReadOnlyContext ctx, Collector<Tuple2<Long, Long>> out) throws Exception {
-			final Map<String, String> actualState = new HashMap<>();
-			for (Map.Entry<String, String> entry: ctx.getBroadcastState(stateDesc).immutableEntries()) {
+			final Map<Long, String> actualState = new HashMap<>();
+			for (Map.Entry<Long, String> entry: ctx.getBroadcastState(stateDesc).immutableEntries()) {
 				actualState.put(entry.getKey(), entry.getValue());
 			}
 			Assert.assertEquals(expectedState, actualState);
diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
index aed6183..2e02f1b 100644
Binary files a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata
index 13e8d32..4ca91d3 100644
Binary files a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.5-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
index 9e2b086..8b2a63a 100644
Binary files a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata differ
diff --git a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata
index cceb5bf..cde6d5c 100644
Binary files a/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata and b/flink-tests/src/test/resources/new-stateful-broadcast-udf-migration-itcase-flink1.6-savepoint/_metadata differ