You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:49:06 UTC

[09/15] flink git commit: [hotfix] [tests] Fix failing tests in AsyncWaitOperatorTest and StateBackendTestBase

[hotfix] [tests] Fix failing tests in AsyncWaitOperatorTest and StateBackendTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34ff16e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34ff16e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34ff16e9

Branch: refs/heads/release-1.3
Commit: 34ff16e9f6ef45ae7eab4334c45e43046e2ea514
Parents: 1c2361e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 15:51:12 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 07:48:32 2017 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 10 +++++-----
 .../streaming/util/AbstractStreamOperatorTestHarness.java |  5 ++++-
 2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/34ff16e9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5a35f0a..44046fb 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1065,7 +1065,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws RocksDBException
 		 */
 		public void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
-				throws IOException, ClassNotFoundException, RocksDBException {
+				throws IOException, StateMigrationException, ClassNotFoundException, RocksDBException {
 
 			rocksDBKeyedStateBackend.createDB();
 
@@ -1091,7 +1091,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws ClassNotFoundException
 		 */
 		private void restoreKeyGroupsInStateHandle()
-				throws IOException, RocksDBException, ClassNotFoundException {
+				throws IOException, StateMigrationException, RocksDBException, ClassNotFoundException {
 			try {
 				currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
 				rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
@@ -1113,7 +1113,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws ClassNotFoundException
 		 * @throws RocksDBException
 		 */
-		private void restoreKVStateMetaData() throws IOException, RocksDBException {
+		private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
 
 			KeyedBackendSerializationProxy<K> serializationProxy =
 					new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
@@ -1130,7 +1130,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				.isRequiresMigration()) {
 
 				// TODO replace with state migration; note that key hash codes need to remain the same after migration
-				throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+				throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
 					"Aborting now since state migration is currently not available");
 			}
 
@@ -1250,7 +1250,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					.isRequiresMigration()) {
 
 					// TODO replace with state migration; note that key hash codes need to remain the same after migration
-					throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+					throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
 						"Aborting now since state migration is currently not available");
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34ff16e9/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index f1aef53..5ff1b33 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -184,7 +184,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
 		when(mockTask.getEnvironment()).thenReturn(environment);
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-		when(mockTask.getUserCodeClassLoader()).thenReturn(environment.getUserClassLoader());
+
+		ClassLoader cl = environment.getUserClassLoader();
+		when(mockTask.getUserCodeClassLoader()).thenReturn(cl);
+
 		when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
 		when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer);