You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/05/28 15:46:38 UTC

[flink] 02/07: [hotfix][tests] Shutdown TaskmanagerServices

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b417353072b84c272fee818d4bfcead90c3a565c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 27 13:00:10 2020 +0200

    [hotfix][tests] Shutdown TaskmanagerServices
---
 .../TaskExecutorLocalStateStoresManagerTest.java   | 56 ++++++++++++----------
 1 file changed, 32 insertions(+), 24 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 13e5e69..49d6fb4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -69,23 +69,27 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 
 		TaskManagerServices taskManagerServices = createTaskManagerServices(createTaskManagerServiceConfiguration(config));
 
-		TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
-
-		// verify configured directories for local state
-		String[] split = rootDirString.split(",");
-		File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
-		for (int i = 0; i < split.length; ++i) {
-			Assert.assertEquals(
-				new File(split[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
-				rootDirectories[i]);
-		}
+		try {
+			TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
+
+			// verify configured directories for local state
+			String[] split = rootDirString.split(",");
+			File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
+			for (int i = 0; i < split.length; ++i) {
+				Assert.assertEquals(
+					new File(split[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+					rootDirectories[i]);
+			}
 
-		// verify local recovery mode
-		Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
+			// verify local recovery mode
+			Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
 
-		Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
-		for (File rootDirectory : rootDirectories) {
-			FileUtils.deleteFileOrDirectory(rootDirectory);
+			Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
+			for (File rootDirectory : rootDirectories) {
+				FileUtils.deleteFileOrDirectory(rootDirectory);
+			}
+		} finally {
+			taskManagerServices.shutDown();
 		}
 	}
 
@@ -102,18 +106,22 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 
 		TaskManagerServices taskManagerServices = createTaskManagerServices(taskManagerServicesConfiguration);
 
-		TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
+		try {
+			TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
 
-		String[] tmpDirPaths = taskManagerServicesConfiguration.getTmpDirPaths();
-		File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories();
+			String[] tmpDirPaths = taskManagerServicesConfiguration.getTmpDirPaths();
+			File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories();
 
-		for (int i = 0; i < tmpDirPaths.length; ++i) {
-			Assert.assertEquals(
-				new File(tmpDirPaths[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
-				localStateRootDirectories[i]);
-		}
+			for (int i = 0; i < tmpDirPaths.length; ++i) {
+				Assert.assertEquals(
+					new File(tmpDirPaths[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+					localStateRootDirectories[i]);
+			}
 
-		Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
+			Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
+		} finally {
+			taskManagerServices.shutDown();
+		}
 	}
 
 	/**