You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/08/15 11:35:59 UTC

[flink] branch release-1.6 updated: [FLINK-9859] [runtime] More Akka config

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 6bca401  [FLINK-9859] [runtime] More Akka config
6bca401 is described below

commit 6bca4011b605dcdff0f8cdd3e9aa7b2a44c106cc
Author: 陈梓立 <wa...@gmail.com>
AuthorDate: Mon Jul 30 09:54:02 2018 +0800

    [FLINK-9859] [runtime] More Akka config
    
    This closes #6339.
---
 docs/_includes/generated/akka_configuration.html   | 45 +++++++++++++
 .../apache/flink/configuration/AkkaOptions.java    | 77 ++++++++++++++++++++++
 .../runtime/clusterframework/BootstrapTools.java   |  9 +--
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java      |  4 +-
 .../org/apache/flink/runtime/akka/AkkaUtils.scala  | 67 ++++++++++++++++---
 .../flink/runtime/jobmanager/JobManager.scala      |  1 -
 6 files changed, 183 insertions(+), 20 deletions(-)

diff --git a/docs/_includes/generated/akka_configuration.html b/docs/_includes/generated/akka_configuration.html
index 352c656..f5a2a5d 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -13,11 +13,41 @@
             <td>Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).</td>
         </tr>
         <tr>
+            <td><h5>akka.client-socket-worker-pool.pool-size-factor</h5></td>
+            <td style="word-wrap: break-word;">1.0</td>
+            <td>The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.client-socket-worker-pool.pool-size-max</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Max number of threads to cap factor-based number to.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.client-socket-worker-pool.pool-size-min</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Min number of threads to cap factor-based number to.</td>
+        </tr>
+        <tr>
             <td><h5>akka.client.timeout</h5></td>
             <td style="word-wrap: break-word;">"60 s"</td>
             <td>Timeout for all blocking calls on the client side.</td>
         </tr>
         <tr>
+            <td><h5>akka.fork-join-executor.parallelism-factor</h5></td>
+            <td style="word-wrap: break-word;">2.0</td>
+            <td>The parallelism factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the parallelism-min and parallelism-max values.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.fork-join-executor.parallelism-max</h5></td>
+            <td style="word-wrap: break-word;">64</td>
+            <td>Max number of threads to cap factor-based parallelism number to.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.fork-join-executor.parallelism-min</h5></td>
+            <td style="word-wrap: break-word;">8</td>
+            <td>Min number of threads to cap factor-based parallelism number to.</td>
+        </tr>
+        <tr>
             <td><h5>akka.framesize</h5></td>
             <td style="word-wrap: break-word;">"10485760b"</td>
             <td>Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier.</td>
@@ -43,6 +73,21 @@
             <td>Milliseconds a gate should be closed for after a remote connection was disconnected.</td>
         </tr>
         <tr>
+            <td><h5>akka.server-socket-worker-pool.pool-size-factor</h5></td>
+            <td style="word-wrap: break-word;">1.0</td>
+            <td>The pool size factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the pool-size-min and pool-size-max values.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.server-socket-worker-pool.pool-size-max</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Max number of threads to cap factor-based number to.</td>
+        </tr>
+        <tr>
+            <td><h5>akka.server-socket-worker-pool.pool-size-min</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Min number of threads to cap factor-based number to.</td>
+        </tr>
+        <tr>
             <td><h5>akka.ssl.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Turns on SSL for Akka’s remote communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true.</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 43c7876..02234b9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -194,4 +194,81 @@ public class AkkaOptions {
 		.key("akka.retry-gate-closed-for")
 		.defaultValue(50L)
 		.withDescription("Milliseconds a gate should be closed for after a remote connection was disconnected.");
+
+	// ==================================================
+	// Configurations for fork-join-executor.
+	// ==================================================
+
+	public static final ConfigOption<Double> FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR = ConfigOptions
+		.key("akka.fork-join-executor.parallelism-factor")
+		.defaultValue(2.0)
+		.withDescription(Description.builder()
+			.text("The parallelism factor is used to determine thread pool size using the" +
+				" following formula: ceil(available processors * factor). Resulting size" +
+				" is then bounded by the parallelism-min and parallelism-max values."
+			).build());
+
+	public static final ConfigOption<Integer> FORK_JOIN_EXECUTOR_PARALLELISM_MIN = ConfigOptions
+		.key("akka.fork-join-executor.parallelism-min")
+		.defaultValue(8)
+		.withDescription(Description.builder()
+			.text("Min number of threads to cap factor-based parallelism number to.").build());
+
+	public static final ConfigOption<Integer> FORK_JOIN_EXECUTOR_PARALLELISM_MAX = ConfigOptions
+		.key("akka.fork-join-executor.parallelism-max")
+		.defaultValue(64)
+		.withDescription(Description.builder()
+			.text("Max number of threads to cap factor-based parallelism number to.").build());
+
+	// ==================================================
+	// Configurations for client-socket-work-pool.
+	// ==================================================
+
+	public static final ConfigOption<Integer> CLIENT_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
+		.key("akka.client-socket-worker-pool.pool-size-min")
+		.defaultValue(1)
+		.withDescription(Description.builder()
+			.text("Min number of threads to cap factor-based number to.").build());
+
+	public static final ConfigOption<Integer> CLIENT_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
+		.key("akka.client-socket-worker-pool.pool-size-max")
+		.defaultValue(2)
+		.withDescription(Description.builder()
+			.text("Max number of threads to cap factor-based number to.").build());
+
+	public static final ConfigOption<Double> CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
+		.key("akka.client-socket-worker-pool.pool-size-factor")
+		.defaultValue(1.0)
+		.withDescription(Description.builder()
+			.text("The pool size factor is used to determine thread pool size" +
+				" using the following formula: ceil(available processors * factor)." +
+				" Resulting size is then bounded by the pool-size-min and" +
+				" pool-size-max values."
+			).build());
+
+	// ==================================================
+	// Configurations for server-socket-work-pool.
+	// ==================================================
+
+	public static final ConfigOption<Integer> SERVER_SOCKET_WORKER_POOL_SIZE_MIN = ConfigOptions
+		.key("akka.server-socket-worker-pool.pool-size-min")
+		.defaultValue(1)
+		.withDescription(Description.builder()
+			.text("Min number of threads to cap factor-based number to.").build());
+
+	public static final ConfigOption<Integer> SERVER_SOCKET_WORKER_POOL_SIZE_MAX = ConfigOptions
+		.key("akka.server-socket-worker-pool.pool-size-max")
+		.defaultValue(2)
+		.withDescription(Description.builder()
+			.text("Max number of threads to cap factor-based number to.").build());
+
+	public static final ConfigOption<Double> SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR = ConfigOptions
+		.key("akka.server-socket-worker-pool.pool-size-factor")
+		.defaultValue(1.0)
+		.withDescription(Description.builder()
+			.text("The pool size factor is used to determine thread pool size" +
+				" using the following formula: ceil(available processors * factor)." +
+				" Resulting size is then bounded by the pool-size-min and" +
+				" pool-size-max values."
+			).build());
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 4e43480..56e4576 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -104,14 +104,7 @@ public class BootstrapTools {
 		while (portsIterator.hasNext()) {
 			// first, we check if the port is available by opening a socket
 			// if the actor system fails to start on the port, we try further
-			ServerSocket availableSocket = NetUtils.createSocketFromPorts(
-				portsIterator,
-				new NetUtils.SocketFactory() {
-					@Override
-					public ServerSocket createSocket(int port) throws IOException {
-						return new ServerSocket(port);
-					}
-				});
+			ServerSocket availableSocket = NetUtils.createSocketFromPorts(portsIterator, ServerSocket::new);
 
 			int port;
 			if (availableSocket == null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 982a536..3a62698 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -52,7 +52,7 @@ public class AkkaRpcServiceUtils {
 	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServiceUtils.class);
 
 	private static final String AKKA_TCP = "akka.tcp";
-	private static final String AkKA_SSL_TCP = "akka.ssl.tcp";
+	private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
 
 	private static final AtomicLong nextNameOffset = new AtomicLong(0L);
 
@@ -162,7 +162,7 @@ public class AkkaRpcServiceUtils {
 		checkNotNull(endpointName, "endpointName is null");
 		checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]");
 
-		final String protocolPrefix = akkaProtocol == AkkaProtocol.SSL_TCP ? AkKA_SSL_TCP : AKKA_TCP;
+		final String protocolPrefix = akkaProtocol == AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP;
 
 		if (addressResolution == AddressResolution.TRY_ADDRESS_RESOLUTION) {
 			// Fail fast if the hostname cannot be resolved
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index b58bfe1..050b4de 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
 import org.jboss.netty.channel.ChannelException
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
 
 import scala.annotation.tailrec
 import scala.concurrent._
@@ -44,9 +44,9 @@ import scala.language.postfixOps
  * actor systems resides in this class.
  */
 object AkkaUtils {
-  val LOG = LoggerFactory.getLogger(AkkaUtils.getClass)
+  val LOG: Logger = LoggerFactory.getLogger(AkkaUtils.getClass)
 
-  val INF_TIMEOUT = 21474835 seconds
+  val INF_TIMEOUT: FiniteDuration = 21474835 seconds
 
   /**
    * Creates a local actor system without remoting.
@@ -124,7 +124,9 @@ object AkkaUtils {
     * @param port to bind against
     * @return A remote Akka config
     */
-  def getAkkaConfig(configuration: Configuration, hostname: String, port: Int): Config = {
+  def getAkkaConfig(configuration: Configuration,
+                    hostname: String,
+                    port: Int): Config = {
     getAkkaConfig(configuration, Some((hostname, port)))
   }
 
@@ -203,6 +205,24 @@ object AkkaUtils {
     val supervisorStrategy = classOf[StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
       .getCanonicalName
 
+    val forkJoinExecutorParallelismFactor =
+      configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
+
+    val forkJoinExecutorParallelismMin =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
+
+    val forkJoinExecutorParallelismMax =
+      configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
+
+    val forkJoinExecutorConfig =
+      s"""
+         | fork-join-executor {
+         |   parallelism-factor = $forkJoinExecutorParallelismFactor
+         |   parallelism-min = $forkJoinExecutorParallelismMin
+         |   parallelism-max = $forkJoinExecutorParallelismMax
+         | }
+       """.stripMargin
+
     val config =
       s"""
         |akka {
@@ -230,9 +250,7 @@ object AkkaUtils {
         |   default-dispatcher {
         |     throughput = $akkaThroughput
         |
-        |     fork-join-executor {
-        |       parallelism-factor = 2.0
-        |     }
+        |   $forkJoinExecutorConfig
         |   }
         | }
         |}
@@ -263,7 +281,7 @@ object AkkaUtils {
   private def validateHeartbeat(pauseParamName: String,
                                 pauseValue: String,
                                 intervalParamName: String,
-                                intervalValue: String) = {
+                                intervalValue: String): Unit = {
     if (Duration.apply(pauseValue).lteq(Duration.apply(intervalValue))) {
       throw new IllegalConfigurationException(
         "%s [%s] must greater then %s [%s]",
@@ -367,6 +385,25 @@ object AkkaUtils {
     val akkaSSLAlgorithmsString = configuration.getString(SecurityOptions.SSL_ALGORITHMS)
     val akkaSSLAlgorithms = akkaSSLAlgorithmsString.split(",").toList.mkString("[", ",", "]")
 
+    val clientSocketWorkerPoolPoolSizeMin =
+      configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN)
+
+    val clientSocketWorkerPoolPoolSizeMax =
+      configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX)
+
+    val clientSocketWorkerPoolPoolSizeFactor =
+      configuration.getDouble(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR)
+
+    val serverSocketWorkerPoolPoolSizeMin =
+      configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN)
+
+    val serverSocketWorkerPoolPoolSizeMax =
+      configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX)
+
+    val serverSocketWorkerPoolPoolSizeFactor =
+      configuration.getDouble(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR)
+
+
     val configString =
       s"""
          |akka {
@@ -397,6 +434,18 @@ object AkkaUtils {
          |        connection-timeout = $akkaTCPTimeout
          |        maximum-frame-size = $akkaFramesize
          |        tcp-nodelay = on
+         |
+         |        client-socket-worker-pool {
+         |          pool-size-min = $clientSocketWorkerPoolPoolSizeMin
+         |          pool-size-max = $clientSocketWorkerPoolPoolSizeMax
+         |          pool-size-factor = $clientSocketWorkerPoolPoolSizeFactor
+         |        }
+         |
+         |        server-socket-worker-pool {
+         |          pool-size-min = $serverSocketWorkerPoolPoolSizeMin
+         |          pool-size-max = $serverSocketWorkerPoolPoolSizeMax
+         |          pool-size-factor = $serverSocketWorkerPoolPoolSizeFactor
+         |        }
          |      }
          |    }
          |
@@ -790,7 +839,7 @@ object AkkaUtils {
           retryOnBindException(fn, stopCond)
         }
       case scala.util.Failure(x: Exception) => x.getCause match {
-        case c: ChannelException =>
+        case _: ChannelException =>
           if (stopCond) {
             scala.util.Failure(new RuntimeException(
               "Unable to do further retries starting the actor system"))
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2a8f492..4f0709e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2139,7 +2139,6 @@ object JobManager {
       configuration: Configuration,
       externalHostname: String,
       port: Int): ActorSystem = {
-
     // Bring up the job manager actor system first, bind it to the given address.
     val jobManagerSystem = BootstrapTools.startActorSystem(
       configuration,