You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nlpcraft.apache.org by ar...@apache.org on 2021/01/25 23:24:01 UTC
[incubator-nlpcraft] branch NLPCRAFT-111 updated: WIP & refactoring.
This is an automated email from the ASF dual-hosted git repository.
aradzinski pushed a commit to branch NLPCRAFT-111
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git
The following commit(s) were added to refs/heads/NLPCRAFT-111 by this push:
new d77878a WIP & refactoring.
d77878a is described below
commit d77878ab765e553f77933725bfb7e48dc2c9938f
Author: Aaron Radzinski <ar...@datalingvo.com>
AuthorDate: Mon Jan 25 15:08:44 2021 -0800
WIP & refactoring.
---
nlpcraft/src/main/resources/nlpcraft.conf | 13 ---
.../nlpcraft/common/pool/NCThreadPoolManager.scala | 117 ++++++++++-----------
2 files changed, 57 insertions(+), 73 deletions(-)
diff --git a/nlpcraft/src/main/resources/nlpcraft.conf b/nlpcraft/src/main/resources/nlpcraft.conf
index b0ecf3c..1b1fa9d 100644
--- a/nlpcraft/src/main/resources/nlpcraft.conf
+++ b/nlpcraft/src/main/resources/nlpcraft.conf
@@ -207,13 +207,6 @@ nlpcraft {
# 'ctxword' server endpoint URL.
ctxword.url="http://localhost:5000"
-
- # TODO: some description (pool name - max thread size)
- # min 0, keepAliveTime = 1 min
- # pools = {
- # "probes.communication": 10,
- # "probe.requests": 10
- # }
}
# Basic NLP toolkit to use on both server and probes. Possible values:
@@ -331,11 +324,5 @@ nlpcraft {
# Maximum execution result size in bytes. Default value is 1M.
# When exceeded the request will be automatically rejected.
resultMaxSizeBytes = 1048576
-
- # TODO: some description (pool name - max thread size)
- # min 0, keepAliveTime = 1 min
- # pools = {
- # "model.solver.pool" = 10
- # }
}
}
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
index 0f4989f..f3df18f 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/pool/NCThreadPoolManager.scala
@@ -18,10 +18,7 @@
package org.apache.nlpcraft.common.pool
import io.opencensus.trace.Span
-import org.apache.nlpcraft.common.config.NCConfigurable
-import org.apache.nlpcraft.common.module.NCModule
-import org.apache.nlpcraft.common.module.NCModule._
-import org.apache.nlpcraft.common.{NCE, NCService, U}
+import org.apache.nlpcraft.common._
import java.util.concurrent._
import scala.collection.JavaConverters._
@@ -31,68 +28,67 @@ import scala.concurrent.ExecutionContext
* Common thread pool manager.
*/
object NCThreadPoolManager extends NCService {
- private final val KEEP_ALIVE_MS = 60000
-
- @volatile private var hs: ConcurrentHashMap[String, Holder] = new ConcurrentHashMap
-
- private case class Holder(context: ExecutionContext, pool: Option[ExecutorService])
-
- private object Config extends NCConfigurable {
- val sizes: Map[String, Integer] = {
- val m: Option[Map[String, Integer]] =
- getMapOpt(
- NCModule.getModule match {
- case SERVER ⇒ "nlpcraft.server.pools"
- case PROBE ⇒ "nlpcraft.probe.pools"
-
- case m ⇒ throw new AssertionError(s"Unexpected runtime module: $m")
- }
- )
-
- m.getOrElse(Map.empty)
- }
-
- @throws[NCE]
- def check(): Unit = {
- val inv = sizes.filter(_._2 <= 0)
-
- if (inv.nonEmpty)
- throw new NCE(s"Invalid pool maximum sizes for: [${inv.keys.mkString(", ")}]")
- }
- }
-
- Config.check()
-
- def getSystemContext: ExecutionContext = ExecutionContext.Implicits.global
+ /**
+ * Pools that should NOT default to a system context.
+ * TODO: in the future - we may need to open this to user configuration.
+ */
+ private final val NON_SYS_POOLS = Seq(
+ "probes.communication",
+ "probe.requests",
+ "model.solver.pool"
+ )
+ private final val KEEP_ALIVE_MS = 60000
+ private final val POOL_SIZE = Runtime.getRuntime.availableProcessors // Since JDK 10 is safe for containers.
+
+ @volatile private var cache: ConcurrentHashMap[String, Holder] = _
+
+ /**
+ *
+ * @param context
+ * @param pool
+ */
+ private case class Holder(
+ context: ExecutionContext,
+ pool: Option[ExecutorService]
+ )
+
+ /**
+ *
+ * @return
+ */
+ def getSystemContext: ExecutionContext = ExecutionContext.Implicits.global
+
+ /**
+ *
+ * @param name
+ * @return
+ */
def getContext(name: String): ExecutionContext =
- hs.computeIfAbsent(
+ cache.computeIfAbsent(
name,
(_: String) ⇒
- Config.sizes.get(name) match {
- case Some(maxSize) ⇒
- val ex = new ThreadPoolExecutor(
- 0,
- maxSize,
- KEEP_ALIVE_MS,
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue[Runnable]
- )
-
- logger.info(s"Custom executor service created for '$name' with maxThreadSize: $maxSize.")
-
- Holder(ExecutionContext.fromExecutor(ex), Some(ex))
- case None ⇒
- logger.info(s"Default executor service created for '$name', because it is not configured.")
-
- Holder(getSystemContext, None)
+ if (NON_SYS_POOLS.contains(name)) {
+ // Create separate executor for these pools...
+ val exec = new ThreadPoolExecutor(
+ 0,
+ POOL_SIZE,
+ KEEP_ALIVE_MS,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue[Runnable]
+ )
+
+ Holder(ExecutionContext.fromExecutor(exec), Some(exec))
}
- ).context
+ else
+ Holder(getSystemContext, None)
+ )
+ .context
override def start(parent: Span): NCService = startScopedSpan("start", parent) { _ ⇒
ackStarting()
- hs = new ConcurrentHashMap
+ cache = new ConcurrentHashMap
ackStarted()
}
@@ -100,9 +96,10 @@ object NCThreadPoolManager extends NCService {
override def stop(parent: Span): Unit = startScopedSpan("stop", parent) { _ ⇒
ackStopping()
- hs.values().asScala.flatMap(_.pool).foreach(U.shutdownPool)
- hs.clear()
- hs = null
+ cache.values().asScala.flatMap(_.pool).foreach(U.shutdownPool)
+ cache.clear()
+
+ cache = null
ackStopped()
}