You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nlpcraft.apache.org by se...@apache.org on 2020/09/22 12:17:07 UTC

[incubator-nlpcraft] branch NLPCRAFT-135 created (now 3a29e58)

This is an automated email from the ASF dual-hosted git repository.

sergeykamov pushed a change to branch NLPCRAFT-135
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git.


      at 3a29e58  WIP.

This branch includes the following new commits:

     new 3a29e58  WIP.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-nlpcraft] 01/01: WIP.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sergeykamov pushed a commit to branch NLPCRAFT-135
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git

commit 3a29e582b21a515ab5e4430b05e2b6c7fb25ba65
Author: Sergey Kamov <se...@apache.org>
AuthorDate: Tue Sep 22 15:16:59 2020 +0300

    WIP.
---
 nlpcraft/src/main/resources/nlpcraft.conf          |  12 ---
 .../org/apache/nlpcraft/model/NCConversation.java  |   8 ++
 .../probe/mgrs/conversation/NCConversation.scala   |  14 ++-
 .../mgrs/conversation/NCConversationManager.scala  |  92 ++++++++++--------
 .../mgrs/dialogflow/NCDialogFlowManager.scala      | 107 +++++++++++++--------
 .../probe/mgrs/nlp/NCProbeEnrichmentManager.scala  |   1 +
 .../model/conversation/NCTimeoutSpec.scala         |  83 ++++++++++++++++
 7 files changed, 221 insertions(+), 96 deletions(-)

diff --git a/nlpcraft/src/main/resources/nlpcraft.conf b/nlpcraft/src/main/resources/nlpcraft.conf
index 54923c0..418b011 100644
--- a/nlpcraft/src/main/resources/nlpcraft.conf
+++ b/nlpcraft/src/main/resources/nlpcraft.conf
@@ -281,18 +281,6 @@ nlpcraft {
         # Maximum execution result size in bytes. Default value is 1M.
         # When exceeded the request will be automatically rejected.
         resultMaxSizeBytes = 1048576
-
-        #
-        # Timeout in ms for conversation manager garbage collector.
-        # Reduce if you are experiencing a large memory utilization under the load with many concurrent users.
-        #
-        convGcTimeoutMs = 60000
-
-        #
-        # Timeout in ms for dialog flow manager garbage collector.
-        # Reduce if you are experiencing a large memory utilization under the load with many concurrent users.
-        #
-        dialogGcTimeoutMs = 60000
     }
 
     # Basic NLP toolkit to use on both server and probes. Possible values:
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/model/NCConversation.java b/nlpcraft/src/main/scala/org/apache/nlpcraft/model/NCConversation.java
index 5655426..94b7013 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/model/NCConversation.java
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/model/NCConversation.java
@@ -18,6 +18,7 @@
 package org.apache.nlpcraft.model;
 
 import java.util.List;
+import java.util.Map;
 import java.util.function.Predicate;
 
 /**
@@ -83,4 +84,11 @@ public interface NCConversation {
      * @param filter Dialog flow filter based on IDs of previously matched intents.
      */
     void clearDialog(Predicate<String/* Intent ID. */> filter);
+
+    /**
+     * // TODO:
+     * Gets synchronized user data map, which will be automatically cleared after model timeout.
+     * @return Data.
+     */
+    Map<String, Object> getData();
 }
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala
index 1f8f51b..4823648 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversation.scala
@@ -18,6 +18,7 @@
 package org.apache.nlpcraft.probe.mgrs.conversation
 
 import java.util
+import java.util.concurrent.ConcurrentHashMap
 import java.util.function.Predicate
 
 import com.typesafe.scalalogging.LazyLogging
@@ -36,9 +37,11 @@ import scala.collection.mutable
 case class NCConversation(
     usrId: Long,
     mdlId: String,
-    updateTimeoutMs: Long,
+    timeoutMs: Long,
     maxDepth: Int
 ) extends LazyLogging with NCOpenCensusTrace {
+    private final val data = new ConcurrentHashMap[String, Object]()
+
     /**
      *
      * @param token
@@ -83,7 +86,7 @@ case class NCConversation(
             depth += 1
 
             // Conversation cleared by timeout or when there are too much unsuccessful requests.
-            if (now - lastUpdateTstamp > updateTimeoutMs) {
+            if (now - lastUpdateTstamp > timeoutMs) {
                 stm.clear()
 
                 logger.info(s"Conversation is reset by timeout [" +
@@ -100,7 +103,7 @@ case class NCConversation(
                 s"]")
             }
             else {
-                val minUsageTime = now - updateTimeoutMs
+                val minUsageTime = now - timeoutMs
                 val toks = lastToks.flatten
 
                 for (item ← stm) {
@@ -278,4 +281,9 @@ case class NCConversation(
             new util.ArrayList[NCToken](toks.asJava)
         }
     }
+
+    /**
+      *
+      */
+    def getData: util.Map[String, Object] = data
 }
\ No newline at end of file
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala
index cca7d5a..054cce1 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/conversation/NCConversationManager.scala
@@ -17,15 +17,11 @@
 
 package org.apache.nlpcraft.probe.mgrs.conversation
 
-import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
-
 import io.opencensus.trace.Span
 import org.apache.nlpcraft.common._
-import org.apache.nlpcraft.common.config.NCConfigurable
 import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
 
 import scala.collection._
-import scala.collection.mutable.ArrayBuffer
 
 /**
   * Conversation manager.
@@ -34,20 +30,9 @@ object NCConversationManager extends NCService {
     case class Key(userId: Long, mdlId: String)
     case class Value(conv: NCConversation, var tstamp: Long = 0)
 
-    private object Config extends NCConfigurable {
-        private final val name = "nlpcraft.probe.convGcTimeoutMs"
-
-        def timeoutMs: Long = getInt(name)
-
-        def check(): Unit =
-            if (timeoutMs <= 0)
-                throw new NCE(s"Configuration property must be >= 0 [name=$name]")
-    }
-
-    Config.check()
+    private final val convs: mutable.Map[Key, Value] = mutable.HashMap.empty[Key, Value]
 
-    @volatile private var convs: mutable.Map[Key, Value] = _
-    @volatile private var gc: ScheduledExecutorService = _
+    @volatile private var gc: Thread = _
 
     /**
      *
@@ -55,13 +40,23 @@ object NCConversationManager extends NCService {
      * @return
      */
     override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
-        gc = Executors.newSingleThreadScheduledExecutor
-
-        convs = mutable.HashMap.empty[Key, Value]
-
-        gc.scheduleWithFixedDelay(() ⇒ clearForTimeout(), Config.timeoutMs, Config.timeoutMs, TimeUnit.MILLISECONDS)
+        gc =
+            U.mkThread("conversation-manager-gc") { t ⇒
+                while (!t.isInterrupted)
+                    try
+                        convs.synchronized {
+                            val sleepTime = clearForTimeout() - System.currentTimeMillis()
+
+                            if (sleepTime > 0)
+                                convs.wait(sleepTime)
+                        }
+                    catch {
+                        case e: InterruptedException ⇒ // No-op.
+                        case e: Throwable ⇒ logger.error(s"Unexpected error for: ${t.getName}", e)
+                    }
+            }
 
-        logger.info(s"Conversation manager GC started, checking every ${Config.timeoutMs}ms.")
+        gc.start()
 
         ackStart()
     }
@@ -71,7 +66,11 @@ object NCConversationManager extends NCService {
      * @param parent Optional parent span.
      */
     override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
-        U.shutdownPools(gc)
+        U.stopThread(gc)
+
+        gc = null
+
+        convs.clear()
 
         logger.info("Conversation manager GC stopped.")
 
@@ -79,24 +78,35 @@ object NCConversationManager extends NCService {
     }
 
     /**
-      *
+      * Gets next clearing time.
       */
-    private def clearForTimeout(): Unit =
-        startScopedSpan("clearForTimeout", "timeoutMs" → Config.timeoutMs) { _ ⇒
-            convs.synchronized {
-                val delKeys = ArrayBuffer.empty[Key]
+    private def clearForTimeout(): Long =
+        startScopedSpan("clearForTimeout") { _ ⇒
+            require(Thread.holdsLock(convs))
 
-                for ((key, value) ← convs)
-                    NCModelManager.getModelOpt(key.mdlId) match {
-                        case Some(data) ⇒
-                            if (value.tstamp < System.currentTimeMillis() - data.model.getConversationTimeout)
-                                delKeys += key
+            val now = System.currentTimeMillis()
+            val delKeys = mutable.HashSet.empty[Key]
 
-                        case None ⇒ delKeys += key
+            for ((key, value) ← convs) {
+                val del =
+                    NCModelManager.getModelOpt(key.mdlId) match {
+                        case Some(mdl) ⇒ value.tstamp < now - mdl.model.getConversationTimeout
+                        case None ⇒ true
                     }
 
-                convs --= delKeys
+                if (del) {
+                    value.conv.getData.clear()
+
+                    delKeys += key
+                }
             }
+
+            convs --= delKeys
+
+            if (convs.nonEmpty)
+                convs.values.map(v ⇒ v.tstamp + v.conv.timeoutMs).min
+            else
+                Long.MaxValue
         }
 
     /**
@@ -115,10 +125,12 @@ object NCConversationManager extends NCService {
                     Key(usrId, mdlId),
                     Value(NCConversation(usrId, mdlId, mdl.getConversationTimeout, mdl.getConversationDepth))
                 )
-                
+
                 v.tstamp = U.nowUtcMs()
-                
-                v
-            }.conv
+
+                convs.notifyAll()
+
+                v.conv
+            }
         }
 }
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala
index 7984199..78b6593 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/dialogflow/NCDialogFlowManager.scala
@@ -17,15 +17,11 @@
 
 package org.apache.nlpcraft.probe.mgrs.dialogflow
 
-import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
-
 import io.opencensus.trace.Span
-import org.apache.nlpcraft.common.config.NCConfigurable
 import org.apache.nlpcraft.common.{NCService, _}
 import org.apache.nlpcraft.probe.mgrs.model.NCModelManager
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection._
 
 /**
  * Dialog flow manager.
@@ -34,20 +30,9 @@ object NCDialogFlowManager extends NCService {
     case class Key(usrId: Long, mdlId: String)
     case class Value(intent: String, tstamp: Long)
 
-    private object Config extends NCConfigurable {
-        private final val name = "nlpcraft.probe.dialogGcTimeoutMs"
-
-        def timeoutMs: Long = getInt(name)
-
-        def check(): Unit =
-            if (timeoutMs <= 0)
-                throw new NCE(s"Configuration property must be >= 0 [name=$name]")
-    }
+    private final val flow = mutable.HashMap.empty[Key, mutable.ArrayBuffer[Value]]
 
-    Config.check()
-
-    @volatile private var flow: mutable.Map[Key, ArrayBuffer[Value]] = _
-    @volatile private var gc: ScheduledExecutorService = _
+    @volatile private var gc: Thread = _
 
     /**
      *
@@ -55,13 +40,23 @@ object NCDialogFlowManager extends NCService {
      * @return
      */
     override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
-        flow = mutable.HashMap.empty[Key, ArrayBuffer[Value]]
-
-        gc = Executors.newSingleThreadScheduledExecutor
-
-        gc.scheduleWithFixedDelay(() ⇒ clearForTimeout(), Config.timeoutMs, Config.timeoutMs, TimeUnit.MILLISECONDS)
+        gc =
+            U.mkThread("dialog-flow-manager-gc") { t ⇒
+                while (!t.isInterrupted)
+                    try
+                        flow.synchronized {
+                            val sleepTime = clearForTimeout() - System.currentTimeMillis()
+
+                            if (sleepTime > 0)
+                                flow.wait(sleepTime)
+                        }
+                    catch {
+                        case e: InterruptedException ⇒ // No-op.
+                        case e: Throwable ⇒ logger.error(s"Unexpected error for: ${t.getName}", e)
+                    }
+            }
 
-        logger.info(s"Dialog flow manager GC started, checking every ${Config.timeoutMs}ms.")
+        gc.start()
 
         ackStart()
     }
@@ -71,7 +66,11 @@ object NCDialogFlowManager extends NCService {
      * @param parent Optional parent span.
      */
     override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
-        U.shutdownPools(gc)
+        U.stopThread(gc)
+
+        gc = null
+
+        flow.clear()
 
         logger.info("Dialog flow manager GC stopped.")
 
@@ -88,9 +87,11 @@ object NCDialogFlowManager extends NCService {
     def addMatchedIntent(intId: String, usrId: Long, mdlId: String, parent: Span = null): Unit = {
         startScopedSpan("addMatchedIntent", parent, "usrId" → usrId, "mdlId" → mdlId, "intId" → intId) { _ ⇒
             flow.synchronized {
-                flow.getOrElseUpdate(Key(usrId, mdlId), ArrayBuffer.empty[Value]).append(
+                flow.getOrElseUpdate(Key(usrId, mdlId), mutable.ArrayBuffer.empty[Value]).append(
                     Value(intId, System.currentTimeMillis())
                 )
+
+                flow.notifyAll()
             }
 
             logger.trace(s"Added to dialog flow [mdlId=$mdlId, intId=$intId, userId=$usrId]")
@@ -107,30 +108,50 @@ object NCDialogFlowManager extends NCService {
     def getDialogFlow(usrId: Long, mdlId: String, parent: Span = null): Seq[String] =
         startScopedSpan("getDialogFlow", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
             flow.synchronized {
-                flow.getOrElseUpdate(Key(usrId, mdlId), ArrayBuffer.empty[Value]).map(_.intent)
+                val res = flow.getOrElseUpdate(Key(usrId, mdlId), mutable.ArrayBuffer.empty[Value]).map(_.intent)
+
+                flow.notifyAll()
+
+                res
             }
         }
 
     /**
-     *
+     *  Gets next clearing time.
      */
-    private def clearForTimeout(): Unit =
-        startScopedSpan("clearForTimeout", "timeoutMs" → Config.timeoutMs) { _ ⇒
-            flow.synchronized {
-                val delKeys = ArrayBuffer.empty[Key]
+    private def clearForTimeout(): Long =
+        startScopedSpan("clearForTimeout") { _ ⇒
+            require(Thread.holdsLock(flow))
 
-                for ((key, values) ← flow)
-                    NCModelManager.getModelOpt(key.mdlId) match {
-                        case Some(data) ⇒
-                            val ms = System.currentTimeMillis() - data.model.getConversationTimeout
+            val now = System.currentTimeMillis()
+            val delKeys = mutable.HashSet.empty[Key]
+            val timeouts = mutable.HashMap.empty[String, Long]
 
-                            values --= values.filter(_.tstamp < ms)
+            for ((key, values) ← flow)
+                NCModelManager.getModelOpt(key.mdlId) match {
+                    case Some(mdl) ⇒
+                        val ms = now - mdl.model.getConversationTimeout
 
-                        case None ⇒ delKeys += key
-                    }
+                        values --= values.filter(_.tstamp < ms)
 
-                flow --= delKeys
-            }
+                        timeouts += mdl.model.getId → mdl.model.getConversationTimeout
+
+                        // https://github.com/scala/bug/issues/10151
+                        // Scala bug workaround.
+                        ()
+                    case None ⇒ delKeys += key
+                }
+
+            delKeys ++= flow.filter(_._2.isEmpty).keySet
+
+            flow --= delKeys
+
+            val times = (for ((key, values) ← flow) yield values.map(v ⇒ v.tstamp + timeouts(key.mdlId))).flatten
+
+            if (times.nonEmpty)
+                times.min
+            else
+                Long.MaxValue
         }
     
     /**
@@ -144,6 +165,8 @@ object NCDialogFlowManager extends NCService {
         startScopedSpan("clear", parent, "usrId" → usrId, "mdlId" → mdlId) { _ ⇒
             flow.synchronized {
                 flow -= Key(usrId, mdlId)
+
+                flow.nonEmpty
             }
         }
     
@@ -161,6 +184,8 @@ object NCDialogFlowManager extends NCService {
 
             flow.synchronized {
                 flow(key) = flow(key).filterNot(v ⇒ pred(v.intent))
+
+                flow.notifyAll()
             }
         }
 }
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/nlp/NCProbeEnrichmentManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/nlp/NCProbeEnrichmentManager.scala
index e40e5ac..4a3581c 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/nlp/NCProbeEnrichmentManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/probe/mgrs/nlp/NCProbeEnrichmentManager.scala
@@ -563,6 +563,7 @@ object NCProbeEnrichmentManager extends NCService with NCOpenCensusModelStats {
                 override def getDialogFlow: util.List[String] = NCDialogFlowManager.getDialogFlow(usrId, mdlId, span).asJava
                 override def clearStm(filter: Predicate[NCToken]): Unit = conv.clearTokens(filter)
                 override def clearDialog(filter: Predicate[String]): Unit = NCDialogFlowManager.clear(usrId, mdlId, span)
+                override def getData: util.Map[String, Object] = conv.getData
             }
 
             override def isOwnerOf(tok: NCToken): Boolean = allVars.contains(tok)
diff --git a/nlpcraft/src/test/scala/org/apache/nlpcraft/model/conversation/NCTimeoutSpec.scala b/nlpcraft/src/test/scala/org/apache/nlpcraft/model/conversation/NCTimeoutSpec.scala
new file mode 100644
index 0000000..1ae4879
--- /dev/null
+++ b/nlpcraft/src/test/scala/org/apache/nlpcraft/model/conversation/NCTimeoutSpec.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nlpcraft.model.conversation
+
+import java.io.IOException
+import java.util
+import java.util.Collections
+
+import org.apache.nlpcraft.common.NCException
+import org.apache.nlpcraft.model.{NCElement, NCIntent, NCIntentMatch, NCModel, NCResult}
+import org.apache.nlpcraft.{NCTestContext, NCTestEnvironment}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+class NCTimeoutSpecModel extends NCModel {
+    private var cnt = 0
+
+    override def getId: String = this.getClass.getSimpleName
+    override def getName: String = this.getClass.getSimpleName
+    override def getVersion: String = "1.0.0"
+
+    override def getConversationTimeout: Long = 2000
+
+    override def getElements: util.Set[NCElement] =
+        Collections.singleton(
+            new NCElement {
+                override def getId: String = "test"
+                override def getSynonyms: util.List[String] = Collections.singletonList("test")
+            }
+        )
+
+    @NCIntent("intent=req conv=true term={id == 'test'}")
+    def onMatch(ctx: NCIntentMatch): NCResult = {
+        ctx.getContext.getConversation.getData.put("key", "value")
+
+        assertTrue(ctx.getContext.getConversation.getData.containsKey("key"))
+
+        if (cnt == 0)
+            assertTrue(ctx.getContext.getConversation.getDialogFlow.isEmpty)
+        else
+            assertFalse(ctx.getContext.getConversation.getDialogFlow.isEmpty)
+
+        Thread.sleep(getConversationTimeout + 1000)
+
+        assertTrue(ctx.getContext.getConversation.getData.isEmpty)
+        assertTrue(ctx.getContext.getConversation.getDialogFlow.isEmpty)
+
+        val msg = s"OK-$cnt"
+
+        cnt = cnt + 1
+
+        NCResult.json(msg)
+    }
+}
+
+/**
+  * @see NCTimeoutSpecModel
+  */
+@NCTestEnvironment(model = classOf[NCTimeoutSpecModel], startClient = true)
+class NCTimeoutSpec extends NCTestContext {
+    @Test
+    @throws[NCException]
+    @throws[IOException]
+    private[conversation] def test(): Unit = {
+        assertTrue(getClient.ask("test").isOk)
+        assertTrue(getClient.ask("test").isOk)
+    }
+}