You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/18 06:40:58 UTC

[GitHub] asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the port selection and RPC se…

asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the port selection and RPC se…
URL: https://github.com/apache/flink/pull/6845
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f528bc4bcc4..cc12ff5ee5c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -33,7 +33,6 @@
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
@@ -50,7 +49,7 @@
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -83,10 +82,6 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -252,7 +247,7 @@ protected void initializeServices(Configuration configuration) throws Exception
 			final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
 			final String portRange = getRPCPortRange(configuration);
 
-			commonRpcService = createRpcService(configuration, bindAddress, portRange);
+			commonRpcService = AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration);
 
 			// update the configuration used to create the high availability services
 			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
@@ -293,15 +288,6 @@ protected String getRPCPortRange(Configuration configuration) {
 		}
 	}
 
-	protected RpcService createRpcService(
-			Configuration configuration,
-			String bindAddress,
-			String portRange) throws Exception {
-		ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, FORK_JOIN_EXECUTOR);
-		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
-		return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
-	}
-
 	protected HighAvailabilityServices createHaServices(
 		Configuration configuration,
 		Executor executor) throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3ee7641f717..28f04f7677d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -22,6 +22,7 @@
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -30,8 +31,6 @@
 import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import org.jboss.netty.channel.ChannelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +59,25 @@
 	//  RPC instantiation
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Utility method to create RPC service from configuration and hostname, port.
+	 *
+	 * @param hostname   The hostname/address that describes the TaskManager's data location.
+	 * @param portRangeDefinition   The port range to start TaskManager on.
+	 * @param configuration                 The configuration for the TaskManager.
+	 * @return   The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+	 * @throws IOException      Thrown, if the actor system can not bind to the address
+	 * @throws Exception      Thrown is some other error occurs while creating akka actor system
+	 */
+	public static RpcService createRpcService(
+		String hostname,
+		String portRangeDefinition,
+		Configuration configuration) throws Exception {
+		final ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, hostname, portRangeDefinition, LOG);
+		final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
+		return new AkkaRpcService(actorSystem, timeout);
+	}
+
 	/**
 	 * Utility method to create RPC service from configuration and hostname, port.
 	 *
@@ -75,35 +93,7 @@ public static RpcService createRpcService(
 		int port,
 		Configuration configuration) throws Exception {
 		LOG.info("Starting AkkaRpcService at {}.", NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
-
-		final ActorSystem actorSystem;
-
-		try {
-			Config akkaConfig;
-
-			if (hostname != null && !hostname.isEmpty()) {
-				// remote akka config
-				akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
-			} else {
-				// local akka config
-				akkaConfig = AkkaUtils.getAkkaConfig(configuration);
-			}
-
-			LOG.debug("Using akka configuration \n {}.", akkaConfig);
-
-			actorSystem = AkkaUtils.createActorSystem(akkaConfig);
-		} catch (Throwable t) {
-			if (t instanceof ChannelException) {
-				Throwable cause = t.getCause();
-				if (cause != null && t.getCause() instanceof java.net.BindException) {
-					String address = NetUtils.hostAndPortToUrlString(hostname, port);
-					throw new IOException("Unable to bind AkkaRpcService actor system to address " +
-						address + " - " + cause.getMessage(), t);
-				}
-			}
-			throw new Exception("Could not create TaskManager actor system", t);
-		}
-
+		final ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, hostname, port, LOG);
 		final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
 		return new AkkaRpcService(actorSystem, timeout);
 	}
@@ -144,7 +134,6 @@ public static String getRpcUrl(
 	}
 
 	/**
-	 * 
 	 * @param hostname The hostname or address where the target RPC service is listening.
 	 * @param port The port where the target RPC service is listening.
 	 * @param endpointName The name of the RPC endpoint.
@@ -204,6 +193,6 @@ public static String createRandomName(String prefix) {
 
 	// ------------------------------------------------------------------------
 
-	/** This class is not meant to be instantiated */
+	/** This class is not meant to be instantiated. */
 	private AkkaRpcServiceUtils() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 15fa4150b1f..2f54172bf6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -56,7 +56,6 @@
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.NetUtils;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
@@ -68,7 +67,6 @@
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -426,39 +424,14 @@ public static RpcService createRpcService(
 		}
 
 		final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
-
-		return bindWithPort(configuration, taskManagerHostname, portRangeDefinition);
-	}
-
-	private static RpcService bindWithPort(
-		Configuration configuration,
-		String taskManagerHostname,
-		String portRangeDefinition) throws Exception{
-
-		// parse port range definition and create port iterator
-		Iterator<Integer> portsIterator;
 		try {
-			portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition);
+			return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
 		} catch (Exception e) {
-			throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition);
-		}
-
-		while (portsIterator.hasNext()) {
-			try {
-				return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portsIterator.next(), configuration);
-			}
-			catch (Exception e) {
-				// we can continue to try if this contains a netty channel exception
-				Throwable cause = e.getCause();
-				if (!(cause instanceof org.jboss.netty.channel.ChannelException ||
-					cause instanceof java.net.BindException)) {
-					throw e;
-				} // else fall through the loop and try the next port
+			if (e instanceof BindException) {
+				throw new BindException("Could not start task manager on any port in port range "
+					+ portRangeDefinition);
 			}
+			throw e;
 		}
-
-		// if we come here, we have exhausted the port range
-		throw new BindException("Could not start task manager on any port in port range "
-			+ portRangeDefinition);
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services