You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/29 11:10:05 UTC

[GitHub] GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest

GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest
URL: https://github.com/apache/flink/pull/7525#discussion_r251788642
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
 ##########
 @@ -16,125 +16,97 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.taskmanager;
+package org.apache.flink.runtime.taskexecutor;
 
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
 
 import org.junit.rules.TemporaryFolder;
-import scala.Tuple2;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.URI;
-import java.util.Iterator;
 
 import static org.junit.Assert.*;
 
 /**
- * Validates that the TaskManager startup properly obeys the configuration
+ * Validates that the TaskManagerRunner startup properly obeys the configuration
  * values.
  *
  * NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run in parallel to other
  * tests in the same JVM as it modifies a static (private) member of the {@link FileSystem} class
  * and verifies its content.
  */
 @NotThreadSafe
-public class TaskManagerConfigurationTest {
+public class TaskManagerRunnerConfigurationTest {
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@Test
-	public void testUsePreconfiguredNetworkInterface() throws Exception {
+	public void testUsePreconfiguredRpcService() throws Exception {
 		final String TEST_HOST_NAME = "testhostname";
 
 		Configuration config = new Configuration();
 		config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME);
 		config.setString(JobManagerOptions.ADDRESS, "localhost");
 		config.setInteger(JobManagerOptions.PORT, 7891);
 
-		HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-			config,
-			Executors.directExecutor(),
-			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-		try {
-
-			Tuple2<String, Iterator<Integer>> address = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices);
-
-			// validate the configured test host name
-			assertEquals(TEST_HOST_NAME, address._1());
-		} finally {
-			highAvailabilityServices.closeAndCleanupAllData();
-		}
-	}
-
-	@Test
-	public void testActorSystemPortConfig() throws Exception {
-		// config with pre-configured hostname to speed up tests (no interface selection)
-		Configuration config = new Configuration();
-		config.setString(TaskManagerOptions.HOST, "localhost");
-		config.setString(JobManagerOptions.ADDRESS, "localhost");
-		config.setInteger(JobManagerOptions.PORT, 7891);
-
 		HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
 			config,
 			Executors.directExecutor(),
 			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 		try {
 			// auto port
-			Iterator<Integer> portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2();
-			assertTrue(portsIter.hasNext());
-			assertEquals(0, (int) portsIter.next());
+			RpcService rpcService = TaskManagerRunner.createRpcService(config, highAvailabilityServices);
+			assertTrue(rpcService.getPort() >= 0);
+			// pre-defined host name
+			assertEquals(TEST_HOST_NAME, rpcService.getAddress());
 
 			// pre-defined port
 			final int testPort = 22551;
 			config.setString(TaskManagerOptions.RPC_PORT, String.valueOf(testPort));
-
-			portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2();
-			assertTrue(portsIter.hasNext());
-			assertEquals(testPort, (int) portsIter.next());
+			rpcService = TaskManagerRunner.createRpcService(config, highAvailabilityServices);
 
 Review comment:
   This will eagerly create an actor system that listens on port 22551. Previously the tests only returned a tuple of hostname and port iterator. There are two problems with the current approach:
   1. We are not shutting down the actor systems
   1. The port `22551` might be occupied already. In that case creating the actor system should fail.
   
   We can extract the respective logic out of `createRpcService` and test that. Can I take over, or do you want to continue working on this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services