You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/20 07:34:15 UTC

[GitHub] [flink] tzulitai commented on a change in pull request #12263: [FLINK-16998][core] Support backwards compatibility for upgraded RowSerializer

tzulitai commented on a change in pull request #12263:
URL: https://github.com/apache/flink/pull/12263#discussion_r427782378



##########
File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -367,36 +367,42 @@ public int getVersion() {
 	/**
 	 * A {@link TypeSerializerSnapshot} for RowSerializer.
 	 */
-	// TODO not fully functional yet due to FLINK-17520
 	public static final class RowSerializerSnapshot extends CompositeTypeSerializerSnapshot<Row, RowSerializer> {
 
 		private static final int VERSION = 3;
 
-		private static final int VERSION_WITHOUT_ROW_KIND = 2;
+		private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-		private boolean legacyModeEnabled = false;
+		private int version = VERSION;
 
 		public RowSerializerSnapshot() {
 			super(RowSerializer.class);
 		}
 
 		RowSerializerSnapshot(RowSerializer serializerInstance) {
 			super(serializerInstance);
+			this.version = translateVersion(serializerInstance);
 		}
 
 		@Override
 		protected int getCurrentOuterSnapshotVersion() {
-			return VERSION;
+			return version;

Review comment:
       This method is only ever relevant for when writing snapshots and not used on restore.
   Therefore, this should always be the latest version, and not the read older version.
   ```suggestion
   			return VERSION;
   ```

##########
File path: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
##########
@@ -76,14 +84,16 @@ public RowSerializerUpgradeTest(TestSpecification<Row, Row> testSpecification) {
 	public static final class RowSerializerSetup implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<Row> {
 		@Override
 		public TypeSerializer<Row> createPriorSerializer() {
-			return stringLongRowSupplier();
+			return createRowSerializer(true);

Review comment:
       To really clarify this, I think we should make the `RowSerializer` constructor that allows passing in the `legacyModeEnabled` flag private, to be only usable by the `RowSerializerSnapshot#createOuterSerializer`. This concern should not be leaked into tests.
   
   The bottom line is, the concern of creating an old serializer with previous formats should only be visible to the snapshots.

##########
File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -367,36 +367,42 @@ public int getVersion() {
 	/**
 	 * A {@link TypeSerializerSnapshot} for RowSerializer.
 	 */
-	// TODO not fully functional yet due to FLINK-17520
 	public static final class RowSerializerSnapshot extends CompositeTypeSerializerSnapshot<Row, RowSerializer> {
 
 		private static final int VERSION = 3;
 
-		private static final int VERSION_WITHOUT_ROW_KIND = 2;
+		private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-		private boolean legacyModeEnabled = false;
+		private int version = VERSION;

Review comment:
       Maybe rename this to `readVersion`, to better convey its difference with the static `VERSION`.

##########
File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -367,36 +367,42 @@ public int getVersion() {
 	/**
 	 * A {@link TypeSerializerSnapshot} for RowSerializer.
 	 */
-	// TODO not fully functional yet due to FLINK-17520
 	public static final class RowSerializerSnapshot extends CompositeTypeSerializerSnapshot<Row, RowSerializer> {
 
 		private static final int VERSION = 3;
 
-		private static final int VERSION_WITHOUT_ROW_KIND = 2;
+		private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-		private boolean legacyModeEnabled = false;
+		private int version = VERSION;
 
 		public RowSerializerSnapshot() {
 			super(RowSerializer.class);
 		}
 
 		RowSerializerSnapshot(RowSerializer serializerInstance) {
 			super(serializerInstance);
+			this.version = translateVersion(serializerInstance);
 		}
 
 		@Override
 		protected int getCurrentOuterSnapshotVersion() {
-			return VERSION;
+			return version;
 		}
 
 		@Override
 		protected void readOuterSnapshot(
 				int readOuterSnapshotVersion,
 				DataInputView in,
 				ClassLoader userCodeClassLoader) {
-			if (readOuterSnapshotVersion == VERSION_WITHOUT_ROW_KIND) {
-				legacyModeEnabled = true;
+			version = readOuterSnapshotVersion;
+		}
+
+		@Override
+		protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(RowSerializer newSerializer) {
+			if (version == translateVersion(newSerializer)) {

Review comment:
       If I understood this correctly, this check doesn't need to be performed against the new serializer.
   
   Since starting 1.11 the created new serializer is always writing in the new format (e.g. `legacyModeEnabled` is always `false`), combined with the fact that the version for the `RowSerializerSnapshot` was up-ticked in 1.11,
   whether or not migration is needed is actually completely encoded / "piggy-backed" in the version of the `RowSerializerSnapshot`:
   
   i.e. this should be able to be simplified to:
   ```suggestion
   			if (readVersion <= LAST_VERSION_WITHOUT_ROW_KIND)) {
   			    return OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION;
   			}
   			return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
   ```

##########
File path: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
##########
@@ -76,14 +84,16 @@ public RowSerializerUpgradeTest(TestSpecification<Row, Row> testSpecification) {
 	public static final class RowSerializerSetup implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<Row> {
 		@Override
 		public TypeSerializer<Row> createPriorSerializer() {
-			return stringLongRowSupplier();
+			return createRowSerializer(true);

Review comment:
       I think there's a mis-understanding in the use of this `createPriorSerializer` method.
   This method is only ever used when writing the test snapshot files, under the target branch.
   
   e.g.
   - if we checkout to the `release-1.10` branch and generate a snapshot, this should create a serializer that writes in the old format. 
   - if we checkout to the `release-1.11` branch and generate a snapshot, this should create a serializer that writes in the new format.
   
   With the current changes in the PR, the generated snapshot files will always be written with the old format, regardless of which branch you're on.
   
   ```suggestion
                // in older branches, this writes in old format;
                // in newer branches >= 1.11, this writes in new format
   			return new RowSerializer(fieldSerializers);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org