You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nlpcraft.apache.org by ar...@apache.org on 2021/01/25 23:24:01 UTC

[incubator-nlpcraft] branch NLPCRAFT-111 updated: WIP & refactoring.

This is an automated email from the ASF dual-hosted git repository.

aradzinski pushed a commit to branch NLPCRAFT-111
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git


The following commit(s) were added to refs/heads/NLPCRAFT-111 by this push:
     new d77878a  WIP & refactoring.
d77878a is described below

commit d77878ab765e553f77933725bfb7e48dc2c9938f
Author: Aaron Radzinski <ar...@datalingvo.com>
AuthorDate: Mon Jan 25 15:08:44 2021 -0800

    WIP & refactoring.
---
 nlpcraft/src/main/resources/nlpcraft.conf          |  13 ---
 .../nlpcraft/common/pool/NCThreadPoolManager.scala | 117 ++++++++++-----------
 2 files changed, 57 insertions(+), 73 deletions(-)

diff --git a/nlpcraft/src/main/resources/nlpcraft.conf b/nlpcraft/src/main/resources/nlpcraft.conf
index b0ecf3c..1b1fa9d 100644
--- a/nlpcraft/src/main/resources/nlpcraft.conf
+++ b/nlpcraft/src/main/resources/nlpcraft.conf
@@ -207,13 +207,6 @@ nlpcraft {
 
         # 'ctxword' server endpoint URL.
         ctxword.url="http://localhost:5000"
-
-        # TODO: some description (pool name - max thread size)
-        # min 0, keepAliveTime = 1 min
-        # pools = {
-        #     "probes.communication": 10,
-        #     "probe.requests": 10
-        # }
     }
 
     # Basic NLP toolkit to use on both server and probes. Possible values:
@@ -331,11 +324,5 @@ nlpcraft {
         # Maximum execution result size in bytes. Default value is 1M.
         # When exceeded the request will be automatically rejected.
         resultMaxSizeBytes = 1048576
-
-        # TODO: some description (pool name - max thread size)
-        # min 0, keepAliveTime = 1 min
-        # pools = {
-        #     "model.solver.pool" = 10
-        # }
     }
 }
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
index 0f4989f..f3df18f 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
@@ -18,10 +18,7 @@
 package org.apache.nlpcraft.common.pool
 
 import io.opencensus.trace.Span
-import org.apache.nlpcraft.common.config.NCConfigurable
-import org.apache.nlpcraft.common.module.NCModule
-import org.apache.nlpcraft.common.module.NCModule._
-import org.apache.nlpcraft.common.{NCE, NCService, U}
+import org.apache.nlpcraft.common._
 
 import java.util.concurrent._
 import scala.collection.JavaConverters._
@@ -31,68 +28,67 @@ import scala.concurrent.ExecutionContext
  * Common thread pool manager.
  */
 object NCThreadPoolManager extends NCService {
-    private final val KEEP_ALIVE_MS = 60000
-
-    @volatile private var hs: ConcurrentHashMap[String, Holder] = new ConcurrentHashMap
-
-    private case class Holder(context: ExecutionContext, pool: Option[ExecutorService])
-
-    private object Config extends NCConfigurable {
-        val sizes: Map[String, Integer] = {
-            val m: Option[Map[String, Integer]] =
-                getMapOpt(
-                    NCModule.getModule match {
-                        case SERVER ⇒ "nlpcraft.server.pools"
-                        case PROBE ⇒ "nlpcraft.probe.pools"
-
-                        case m ⇒ throw new AssertionError(s"Unexpected runtime module: $m")
-                    }
-                )
-
-            m.getOrElse(Map.empty)
-        }
-
-        @throws[NCE]
-        def check(): Unit = {
-            val inv = sizes.filter(_._2 <= 0)
-
-            if (inv.nonEmpty)
-                throw new NCE(s"Invalid pool maximum sizes for: [${inv.keys.mkString(", ")}]")
-        }
-    }
-
-    Config.check()
-
-    def getSystemContext: ExecutionContext =  ExecutionContext.Implicits.global
+    /**
+     * Pools that should NOT default to a system context.
+     * TODO: in the future - we may need to open this to user configuration.
+     */
+    private final val NON_SYS_POOLS = Seq(
+        "probes.communication",
+        "probe.requests",
+        "model.solver.pool"
+    )
 
+    private final val KEEP_ALIVE_MS = 60000
+    private final val POOL_SIZE = Runtime.getRuntime.availableProcessors // Since JDK 10 is safe for containers.
+
+    @volatile private var cache: ConcurrentHashMap[String, Holder] = _
+
+    /**
+     *
+     * @param context
+     * @param pool
+     */
+    private case class Holder(
+        context: ExecutionContext,
+        pool: Option[ExecutorService]
+    )
+
+    /**
+     *
+     * @return
+     */
+    def getSystemContext: ExecutionContext = ExecutionContext.Implicits.global
+
+    /**
+     *
+     * @param name
+     * @return
+     */
     def getContext(name: String): ExecutionContext =
-        hs.computeIfAbsent(
+        cache.computeIfAbsent(
             name,
             (_: String) ⇒
-                Config.sizes.get(name) match {
-                    case Some(maxSize) ⇒
-                        val ex = new ThreadPoolExecutor(
-                            0,
-                            maxSize,
-                            KEEP_ALIVE_MS,
-                            TimeUnit.MILLISECONDS,
-                            new LinkedBlockingQueue[Runnable]
-                        )
-
-                        logger.info(s"Custom executor service created for '$name' with maxThreadSize: $maxSize.")
-
-                        Holder(ExecutionContext.fromExecutor(ex), Some(ex))
-                    case None ⇒
-                        logger.info(s"Default executor service created for '$name', because it is not configured.")
-
-                        Holder(getSystemContext, None)
+                if (NON_SYS_POOLS.contains(name)) {
+                    // Create separate executor for these pools...
+                    val exec = new ThreadPoolExecutor(
+                        0,
+                        POOL_SIZE,
+                        KEEP_ALIVE_MS,
+                        TimeUnit.MILLISECONDS,
+                        new LinkedBlockingQueue[Runnable]
+                    )
+
+                    Holder(ExecutionContext.fromExecutor(exec), Some(exec))
                 }
-            ).context
+                else
+                    Holder(getSystemContext, None)
+        )
+        .context
 
     override def start(parent: Span): NCService = startScopedSpan("start", parent) { _ ⇒
         ackStarting()
 
-        hs = new ConcurrentHashMap
+        cache = new ConcurrentHashMap
 
         ackStarted()
     }
@@ -100,9 +96,10 @@ object NCThreadPoolManager extends NCService {
     override def stop(parent: Span): Unit = startScopedSpan("stop", parent) { _ ⇒
         ackStopping()
 
-        hs.values().asScala.flatMap(_.pool).foreach(U.shutdownPool)
-        hs.clear()
-        hs = null
+        cache.values().asScala.flatMap(_.pool).foreach(U.shutdownPool)
+        cache.clear()
+
+        cache = null
 
         ackStopped()
     }