You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/12/11 09:47:46 UTC
[flink] 01/15: [hotfix] [state backend,
tests] Certain StateBackendMigrationTestBase tests should fail if
exception isn't thrown
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bc41b756ff180d9903ba22c0d83635c38a11098f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Dec 7 20:42:52 2018 +0800
[hotfix] [state backend, tests] Certain StateBackendMigrationTestBase tests should fail if exception isn't thrown
This commit strengthens tests in StateBackendMigrationTestBase that
depend on a certain state operation (restoring state, accessing state,
etc.) to be failing to assert correct behaviour. However, we previously
do not really fail the test if no exception was thrown when there should
be.
This also caught some bugs in the test itself which had the tests
verifying incorrect behaviour.
---
.../state/StateBackendMigrationTestBase.java | 27 ++++++++++++++++++----
1 file changed, 23 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index f5f30d5..dff49f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -245,6 +245,8 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
// the new serializer is INCOMPATIBLE, so registering the state should fail
backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+
+ Assert.fail("should have failed");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
}finally {
@@ -288,10 +290,12 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
kvId = new ListStateDescriptor<>(
stateName,
- new TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+ new TestSerializer(SerializerCompatibilityType.INCOMPATIBLE));
// the new serializer is INCOMPATIBLE, so registering the state should fail
backend.getPartitionedState(VoidNamespace.INSTANCE, CustomVoidNamespaceSerializer.INSTANCE, kvId);
+
+ Assert.fail("should have failed");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
} finally {
@@ -358,6 +362,8 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
try {
// the new key serializer is incompatible; this should fail the restore
restoreKeyedBackend(new TestSerializer(SerializerCompatibilityType.INCOMPATIBLE), snapshot);
+
+ Assert.fail("should have failed");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
}
@@ -365,6 +371,8 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
try {
// the new key serializer requires migration; this should fail the restore
restoreKeyedBackend(new TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION), snapshot);
+
+ Assert.fail("should have failed");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
}
@@ -397,26 +405,33 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
KeyedStateHandle snapshot = runSnapshot(
backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()),
sharedStateRegistry);
- backend.dispose();
+ // test incompatible namespace serializer; start with a freshly restored backend
+ backend.dispose();
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
-
try {
// the new namespace serializer is incompatible; this should fail the restore
backend.getPartitionedState(
new TestType("namespace", 123),
new TestSerializer(SerializerCompatibilityType.INCOMPATIBLE),
kvId);
+
+ Assert.fail("should have failed");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
}
+ // test namespace serializer that requires migration; start with a freshly restored backend
+ backend.dispose();
+ backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot);
try {
// the new namespace serializer requires migration; this should fail the restore
backend.getPartitionedState(
new TestType("namespace", 123),
new TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION),
kvId);
+
+ Assert.fail("should have failed");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
}
@@ -685,10 +700,12 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
descriptor = new MapStateDescriptor<>(
stateName,
IntSerializer.INSTANCE,
- new TestSerializer(SerializerCompatibilityType.REQUIRES_MIGRATION));
+ new TestSerializer(SerializerCompatibilityType.INCOMPATIBLE));
// the new value serializer is INCOMPATIBLE, so registering the state should fail
backend.getBroadcastState(descriptor);
+
+ Assert.fail("should have failed.");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
} finally {
@@ -726,6 +743,8 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke
// the new key serializer is INCOMPATIBLE, so registering the state should fail
backend.getBroadcastState(descriptor);
+
+ Assert.fail("should have failed.");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
} finally {