You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:49:07 UTC

[10/15] flink git commit: [FLINK-6803] [tests] Enhancements to PojoSerializerUpgradeTest

[FLINK-6803] [tests] Enhancements to PojoSerializerUpgradeTest

1. Allow tests to ignore missing fields.
2. Add equivalent tests which use POJOs as managed operator state.

For 2, all tests have to be ignored for now until FLINK-6804 is fixed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/deaf4bfd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/deaf4bfd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/deaf4bfd

Branch: refs/heads/release-1.3
Commit: deaf4bfd5bc578239b0f67ba034e7ea328a30963
Parents: 34ff16e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 19:32:53 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 07:48:32 2017 +0200

----------------------------------------------------------------------
 .../PojoSerializerUpgradeTest.java              | 210 +++++++++++++++----
 1 file changed, 164 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/deaf4bfd/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index 2769c50..a925d43 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
@@ -169,21 +171,30 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 		"@Override public String toString() {return \"(\" + a + \")\";}}";
 
 	/**
-	 * We should be able to handle a changed field order
+	 * We should be able to handle a changed field order of a POJO as keyed state
 	 */
 	@Test
-	public void testChangedFieldOrder() throws Exception {
-		testPojoSerializerUpgrade(SOURCE_A, SOURCE_B);
+	public void testChangedFieldOrderWithKeyedState() throws Exception {
+		testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, true);
 	}
 
 	/**
-	 * Changing field types should require a state migration
+	 * We should be able to handle a changed field order of a POJO as operator state
 	 */
+	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
 	@Test
-	public void testChangedFieldTypes() throws Exception {
+	public void testChangedFieldOrderWithOperatorState() throws Exception {
+		testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, false);
+	}
+
+	/**
+	 * Changing field types of a POJO as keyed state should require a state migration
+	 */
+	@Test
+	public void testChangedFieldTypesWithKeyedState() throws Exception {
 		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
 		try {
-			testPojoSerializerUpgrade(SOURCE_A, SOURCE_C);
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, true);
 			fail("Expected a state migration exception.");
 		} catch (Exception e) {
 			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
@@ -195,13 +206,31 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Adding fields should require a state migration
+	 * Changing field types of a POJO as operator state should require a state migration
 	 */
+	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
 	@Test
-	public void testAdditionalField() throws Exception {
+	public void testChangedFieldTypesWithOperatorState() throws Exception {
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, false);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Adding fields to a POJO as keyed state should require a state migration
+	 */
+	@Test
+	public void testAdditionalFieldWithKeyedState() throws Exception {
 		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
 		try {
-			testPojoSerializerUpgrade(SOURCE_A, SOURCE_D);
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, true);
 			fail("Expected a state migration exception.");
 		} catch (Exception e) {
 			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
@@ -213,13 +242,49 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Removing fields should require a state migration
+	 * Adding fields to a POJO as operator state should require a state migration
+	 */
+	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
+	@Test
+	public void testAdditionalFieldWithOperatorState() throws Exception {
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, false);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Removing fields from a POJO as keyed state should require a state migration
 	 */
 	@Ignore("Ignore this test until FLINK-6801 has been fixed.")
 	@Test
-	public void testMissingField() throws Exception {
+	public void testMissingFieldWithKeyedState() throws Exception {
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, true);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Removing fields from a POJO as operator state should require a state migration
+	 */
+	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
+	@Test
+	public void testMissingFieldWithOperatorState() throws Exception {
 		try {
-			testPojoSerializerUpgrade(SOURCE_A, SOURCE_E);
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, false);
 			fail("Expected a state migration exception.");
 		} catch (Exception e) {
 			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
@@ -230,7 +295,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 		}
 	}
 
-	public void testPojoSerializerUpgrade(String classSourceA, String classSourceB) throws Exception {
+	public void testPojoSerializerUpgrade(String classSourceA, String classSourceB, boolean hasBField, boolean isKeyedState) throws Exception {
 		final Configuration taskConfiguration = new Configuration();
 		final ExecutionConfig executionConfig = new ExecutionConfig();
 		final KeySelector<Long, Long> keySelector = new IdentityKeySelector<>();
@@ -248,8 +313,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 		OperatorStateHandles stateHandles = runOperator(
 			taskConfiguration,
 			executionConfig,
-			new StreamMap<>(new StatefulMapper(true, false)),
+			new StreamMap<>(new StatefulMapper(isKeyedState, false, hasBField)),
 			keySelector,
+			isKeyedState,
 			stateBackend,
 			classLoader,
 			null,
@@ -268,8 +334,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 		runOperator(
 			taskConfiguration,
 			executionConfig,
-			new StreamMap<>(new StatefulMapper(true, true)),
+			new StreamMap<>(new StatefulMapper(isKeyedState, true, hasBField)),
 			keySelector,
+			isKeyedState,
 			stateBackend,
 			classLoaderB,
 			stateHandles,
@@ -281,6 +348,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			ExecutionConfig executionConfig,
 			OneInputStreamOperator<Long, Long> operator,
 			KeySelector<Long, Long> keySelector,
+			boolean isKeyedState,
 			StateBackend stateBackend,
 			ClassLoader classLoader,
 			OperatorStateHandles operatorStateHandles,
@@ -298,11 +366,17 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			0,
 			classLoader);
 
-		final KeyedOneInputStreamOperatorTestHarness<Long, Long, Long> harness = new KeyedOneInputStreamOperatorTestHarness<>(
-			operator,
-			keySelector,
-			BasicTypeInfo.LONG_TYPE_INFO,
-			environment);
+		OneInputStreamOperatorTestHarness<Long, Long> harness;
+
+		if (isKeyedState) {
+			harness = new KeyedOneInputStreamOperatorTestHarness<>(
+				operator,
+				keySelector,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				environment);
+		} else {
+			harness = new OneInputStreamOperatorTestHarness<>(operator, LongSerializer.INSTANCE, environment);
+		}
 
 		harness.setStateBackend(stateBackend);
 
@@ -350,18 +424,26 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 
 		private final boolean keyed;
 		private final boolean verify;
+		private final boolean hasBField;
+
+		// keyed states
+		private transient ValueState<Object> keyedValueState;
+		private transient MapState<Object, Object> keyedMapState;
+		private transient ListState<Object> keyedListState;
+		private transient ReducingState<Object> keyedReducingState;
+
+		// operator states
+		private transient ListState<Object> partitionableListState;
+		private transient ListState<Object> unionListState;
 
-		private transient ValueState<Object> valueState;
-		private transient MapState<Object, Object> mapState;
-		private transient ListState<Object> listState;
-		private transient ReducingState<Object> reducingState;
 		private transient Class<?> pojoClass;
 		private transient Field fieldA;
 		private transient Field fieldB;
 
-		public StatefulMapper(boolean keyed, boolean verify) {
+		public StatefulMapper(boolean keyed, boolean verify, boolean hasBField) {
 			this.keyed = keyed;
 			this.verify = verify;
+			this.hasBField = hasBField;
 		}
 
 		@Override
@@ -369,30 +451,54 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			Object pojo = pojoClass.newInstance();
 
 			fieldA.set(pojo, value);
-			fieldB.set(pojo, value + "");
+
+			if (hasBField) {
+				fieldB.set(pojo, value + "");
+			}
 
 			if (verify) {
-				assertEquals(pojo, valueState.value());
+				if (keyed) {
+					assertEquals(pojo, keyedValueState.value());
 
-				assertTrue(mapState.contains(pojo));
-				assertEquals(pojo, mapState.get(pojo));
+					assertTrue(keyedMapState.contains(pojo));
+					assertEquals(pojo, keyedMapState.get(pojo));
 
-				Iterator<Object> listIterator = listState.get().iterator();
+					Iterator<Object> listIterator = keyedListState.get().iterator();
 
-				boolean elementFound = false;
+					boolean elementFound = false;
 
-				while(listIterator.hasNext()) {
-					elementFound |= pojo.equals(listIterator.next());
-				}
+					while (listIterator.hasNext()) {
+						elementFound |= pojo.equals(listIterator.next());
+					}
+
+					assertTrue(elementFound);
 
-				assertTrue(elementFound);
+					assertEquals(pojo, keyedReducingState.get());
+				} else {
+					boolean elementFound = false;
+					Iterator<Object> listIterator = partitionableListState.get().iterator();
+					while (listIterator.hasNext()) {
+						elementFound |= pojo.equals(listIterator.next());
+					}
+					assertTrue(elementFound);
 
-				assertEquals(pojo, reducingState.get());
+					elementFound = false;
+					listIterator = unionListState.get().iterator();
+					while (listIterator.hasNext()) {
+						elementFound |= pojo.equals(listIterator.next());
+					}
+					assertTrue(elementFound);
+				}
 			} else {
-				valueState.update(pojo);
-				mapState.put(pojo, pojo);
-				listState.add(pojo);
-				reducingState.add(pojo);
+				if (keyed) {
+					keyedValueState.update(pojo);
+					keyedMapState.put(pojo, pojo);
+					keyedListState.add(pojo);
+					keyedReducingState.add(pojo);
+				} else {
+					partitionableListState.add(pojo);
+					unionListState.add(pojo);
+				}
 			}
 
 			return value;
@@ -408,17 +514,29 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			pojoClass = getRuntimeContext().getUserCodeClassLoader().loadClass(POJO_NAME);
 
 			fieldA = pojoClass.getDeclaredField("a");
-			fieldB = pojoClass.getDeclaredField("b");
 			fieldA.setAccessible(true);
-			fieldB.setAccessible(true);
+
+			if (hasBField) {
+				fieldB = pojoClass.getDeclaredField("b");
+				fieldB.setAccessible(true);
+			}
 
 			if (keyed) {
-				valueState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("valueState", (Class<Object>) pojoClass));
-				mapState = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>("mapState", (Class<Object>) pojoClass, (Class<Object>) pojoClass));
-				listState = context.getKeyedStateStore().getListState(new ListStateDescriptor<>("listState", (Class<Object>) pojoClass));
+				keyedValueState = context.getKeyedStateStore().getState(
+					new ValueStateDescriptor<>("keyedValueState", (Class<Object>) pojoClass));
+				keyedMapState = context.getKeyedStateStore().getMapState(
+					new MapStateDescriptor<>("keyedMapState", (Class<Object>) pojoClass, (Class<Object>) pojoClass));
+				keyedListState = context.getKeyedStateStore().getListState(
+					new ListStateDescriptor<>("keyedListState", (Class<Object>) pojoClass));
 
 				ReduceFunction<Object> reduceFunction = new FirstValueReducer<>();
-				reducingState = context.getKeyedStateStore().getReducingState(new ReducingStateDescriptor<>("reducingState", reduceFunction, (Class<Object>) pojoClass));
+				keyedReducingState = context.getKeyedStateStore().getReducingState(
+					new ReducingStateDescriptor<>("keyedReducingState", reduceFunction, (Class<Object>) pojoClass));
+			} else {
+				partitionableListState = context.getOperatorStateStore().getListState(
+					new ListStateDescriptor<>("partitionableListState", (Class<Object>) pojoClass));
+				unionListState = context.getOperatorStateStore().getUnionListState(
+					new ListStateDescriptor<>("unionListState", (Class<Object>) pojoClass));
 			}
 		}
 	}