You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/12/11 09:47:46 UTC

[flink] 01/15: [hotfix] [state backend, tests] Certain StateBackendMigrationTestBase tests should fail if exception isn't thrown

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc41b756ff180d9903ba22c0d83635c38a11098f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Dec 7 20:42:52 2018 +0800

    [hotfix] [state backend, tests] Certain StateBackendMigrationTestBase tests should fail if exception isn't thrown
    
    This commit strengthens tests in StateBackendMigrationTestBase that
    depend on a certain state operation (restoring state, accessing state,
    etc.) to be failing to assert correct behaviour. However, we previously
    do not really fail the test if no exception was thrown when there should
    be.
    
    This also caught some bugs in the test itself which had the tests
    verifying incorrect behaviour.
---
 .../state/StateBackendMigrationTestBase.java       | 27 ++++++++++++++++++----
 1 file changed, 23 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index f5f30d5..dff49f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -245,6 +245,8 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 
 			// the new serializer is INCOMPATIBLE, so registering the state should fail
 			backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+
+			Assert.fail("should have failed");
 		} catch (Exception e) {
 			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
 		}finally {
@@ -288,10 +290,12 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 
 			kvId = new ListStateDescriptor<>(
 				stateName,
-				new TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+				new TestSerializer(SerializerCompatibilityType.INCOMPATIBLE));
 
 			// the new serializer is INCOMPATIBLE, so registering the state should fail
 			backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+
+			Assert.fail("should have failed");
 		} catch (Exception e) {
 			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
 		} finally {
@@ -358,6 +362,8 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 			try {
 				// the new key serializer is incompatible; this should fail the restore
 				restoreKeyedBackend(new TestSerializer(SerializerCompatibilityType.INCOMPATIBLE), snapshot);
+
+				Assert.fail("should have failed");
 			} catch (Exception e) {
 				Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
 			}
@@ -365,6 +371,8 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 			try {
 				// the new key serializer requires migration; this should fail the restore
 				restoreKeyedBackend(new TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION), snapshot);
+
+				Assert.fail("should have failed");
 			} catch (Exception e) {
 				Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
 			}
@@ -397,26 +405,33 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 			KeyedStateHandle snapshot = runSnapshot(
 				backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()),
 				sharedStateRegistry);
-			backend.dispose();
 
+			// test incompatible namespace serializer; start with a freshly restored backend
+			backend.dispose();
 			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
-
 			try {
 				// the new namespace serializer is incompatible; this should fail the restore
 				backend.getPartitionedState(
 					new TestType("namespace", 123),
 					new TestSerializer(SerializerCompatibilityType.INCOMPATIBLE),
 					kvId);
+
+				Assert.fail("should have failed");
 			} catch (Exception e) {
 				Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
 			}
 
+			// test namespace serializer that requires migration; start with a freshly restored backend
+			backend.dispose();
+			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
 			try {
 				// the new namespace serializer requires migration; this should fail the restore
 				backend.getPartitionedState(
 					new TestType("namespace", 123),
 					new TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION),
 					kvId);
+
+				Assert.fail("should have failed");
 			} catch (Exception e) {
 				Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
 			}
@@ -685,10 +700,12 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 			descriptor = new MapStateDescriptor<>(
 				stateName,
 				IntSerializer.INSTANCE,
-				new TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+				new TestSerializer(SerializerCompatibilityType.INCOMPATIBLE));
 
 			// the new value serializer is INCOMPATIBLE, so registering the state should fail
 			backend.getBroadcastState(descriptor);
+
+			Assert.fail("should have failed.");
 		} catch (Exception e) {
 			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
 		} finally {
@@ -726,6 +743,8 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
 
 			// the new key serializer is INCOMPATIBLE, so registering the state should fail
 			backend.getBroadcastState(descriptor);
+
+			Assert.fail("should have failed.");
 		} catch (Exception e) {
 			Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
 		} finally {