You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/27 15:18:25 UTC

[3/3] flink git commit: [FLINK-4696] [core] Limit number of Akka threads in local minicluster setups

[FLINK-4696] [core] Limit number of Akka threads in local minicluster setups

Since Flink uses a rather small number of actors, not too many actor dispatcher threads are needed.
To prevent mini cluster setups on multi-core CPUs (32 or 64 cores) to spawn too many threads,
this limits the number of dispatcher threads for mini clusters.

For proper Flink deployments, the threads are not limited by this change.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ea9284d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ea9284d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ea9284d

Branch: refs/heads/master
Commit: 6ea9284d29ec79576f073441a5de681019720ab0
Parents: e5d62da
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 27 14:21:20 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 27 14:58:41 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/akka/AkkaUtils.scala    | 19 +++++++++++++++++++
 .../runtime/minicluster/FlinkMiniCluster.scala   | 10 ++++++----
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ea9284d/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 7aa75c0..bd3af33 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -189,6 +189,25 @@ object AkkaUtils {
     ConfigFactory.parseString(config)
   }
 
+  def testDispatcherConfig: Config = {
+    val config =
+      s"""
+         |akka {
+         |  actor {
+         |    default-dispatcher {
+         |      fork-join-executor {
+         |        parallelism-factor = 1.0
+         |        parallelism-min = 1
+         |        parallelism-max = 4
+         |      }
+         |    }
+         |  }
+         |}
+      """.stripMargin
+
+    ConfigFactory.parseString(config)
+  }
+
   /**
    * Creates a Akka config for a remote actor system listening on port on the network interface
    * identified by hostname.

http://git-wip-us.apache.org/repos/asf/flink/blob/6ea9284d/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0178bd3..a263f66 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent._
-import scala.concurrent.forkjoin.ForkJoinPool
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -266,17 +265,20 @@ abstract class FlinkMiniCluster(
 
   def startResourceManagerActorSystem(index: Int): ActorSystem = {
     val config = getResourceManagerAkkaConfig(index)
-    AkkaUtils.createActorSystem(config)
+    val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config)
+    AkkaUtils.createActorSystem(testConfig)
   }
 
   def startJobManagerActorSystem(index: Int): ActorSystem = {
     val config = getJobManagerAkkaConfig(index)
-    AkkaUtils.createActorSystem(config)
+    val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config)
+    AkkaUtils.createActorSystem(testConfig)
   }
 
   def startTaskManagerActorSystem(index: Int): ActorSystem = {
     val config = getTaskManagerAkkaConfig(index)
-    AkkaUtils.createActorSystem(config)
+    val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config)
+    AkkaUtils.createActorSystem(testConfig)
   }
 
   def startJobClientActorSystem(jobID: JobID): ActorSystem = {