You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nlpcraft.apache.org by ar...@apache.org on 2020/09/15 02:39:04 UTC

[incubator-nlpcraft] branch NLPCRAFT-108 updated (d3956d6 -> 162bb74)

This is an automated email from the ASF dual-hosted git repository.

aradzinski pushed a change to branch NLPCRAFT-108
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git.


    from d3956d6  Merge branch 'NLPCRAFT-108' of https://github.com/apache/incubator-nlpcraft into NLPCRAFT-108
     new ab4880a  WIP.
     add 24d65be  Fixing error log.
     new 162bb74  Merge branch 'master' into NLPCRAFT-108

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 nlpcraft/src/main/resources/nlpcraft.conf          |   1 -
 .../org/apache/nlpcraft/common/NCService.scala     |   7 +
 .../apache/nlpcraft/common/socket/NCSocket.scala   |   5 +-
 .../org/apache/nlpcraft/common/util/NCUtils.scala  |  27 +++-
 .../model/tools/cmdline/NCCommandLine.scala        |  58 ++-----
 .../org/apache/nlpcraft/server/NCServer.scala      |   2 +-
 .../nlpcraft/server/ctrl/NCControlManager.scala    | 171 +++++++++++++++++++++
 .../nlpcraft/server/probe/NCProbeManager.scala     |  34 ++--
 .../nlpcraft/server/rest/NCBasicRestApi.scala      |   4 +-
 9 files changed, 231 insertions(+), 78 deletions(-)
 create mode 100644 nlpcraft/src/main/scala/org/apache/nlpcraft/server/ctrl/NCControlManager.scala


[incubator-nlpcraft] 02/02: Merge branch 'master' into NLPCRAFT-108

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aradzinski pushed a commit to branch NLPCRAFT-108
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git

commit 162bb74b0742f3fa4fb3a6881a1399bfb5df38f1
Merge: ab4880a 24d65be
Author: Aaron Radzinski <ar...@datalingvo.com>
AuthorDate: Mon Sep 14 19:38:50 2020 -0700

    Merge branch 'master' into NLPCRAFT-108

 .../org/apache/nlpcraft/common/util/NCUtils.scala  | 27 +++++++++++++++++++---
 .../nlpcraft/server/probe/NCProbeManager.scala     |  6 ++---
 .../nlpcraft/server/rest/NCBasicRestApi.scala      |  4 ++--
 3 files changed, 29 insertions(+), 8 deletions(-)



[incubator-nlpcraft] 01/02: WIP.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aradzinski pushed a commit to branch NLPCRAFT-108
in repository https://gitbox.apache.org/repos/asf/incubator-nlpcraft.git

commit ab4880a1d45a3fb5e51de2d6a58a7e23032c129f
Author: Aaron Radzinski <ar...@datalingvo.com>
AuthorDate: Mon Sep 14 19:30:27 2020 -0700

    WIP.
---
 nlpcraft/src/main/resources/nlpcraft.conf          |   1 -
 .../org/apache/nlpcraft/common/NCService.scala     |   7 +
 .../apache/nlpcraft/common/socket/NCSocket.scala   |   5 +-
 .../model/tools/cmdline/NCCommandLine.scala        |  58 ++-----
 .../org/apache/nlpcraft/server/NCServer.scala      |   2 +-
 .../nlpcraft/server/ctrl/NCControlManager.scala    | 171 +++++++++++++++++++++
 .../nlpcraft/server/probe/NCProbeManager.scala     |  28 ++--
 7 files changed, 202 insertions(+), 70 deletions(-)

diff --git a/nlpcraft/src/main/resources/nlpcraft.conf b/nlpcraft/src/main/resources/nlpcraft.conf
index edad00b..54923c0 100644
--- a/nlpcraft/src/main/resources/nlpcraft.conf
+++ b/nlpcraft/src/main/resources/nlpcraft.conf
@@ -176,7 +176,6 @@ nlpcraft {
             pingTimeoutMs = 2000
             soTimeoutMs = 5000
             reconnectTimeoutMs = 5000
-            poolSize = 100
         }
 
         # Default date formatting for 'nlpcraft:date' token detection only.
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/NCService.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/NCService.scala
index 79f72a7..b71e647 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/NCService.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/NCService.scala
@@ -63,6 +63,13 @@ abstract class NCService extends LazyLogging with NCOpenCensusTrace {
     }
 
     /**
+     * Gets name of this service (as its class name).
+     *
+     * @return Name of this service.
+     */
+    def name: String = clsName
+
+    /**
       * Stops this service.
       *
       * @param parent Optional parent span.
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/socket/NCSocket.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/socket/NCSocket.scala
index 026a73a..7b76f4f 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/common/socket/NCSocket.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/common/socket/NCSocket.scala
@@ -29,9 +29,8 @@ import org.apache.nlpcraft.common.crypto.NCCipher
 /**
   * Socket wrapper that does optional encryption and uses HTTP POST protocol for sending and receiving.
   */
-case class NCSocket(socket: Socket, host: String, soTimeout: Int = 20000) extends LazyLogging {
+case class NCSocket(socket: Socket, soTimeout: Int = 20000) extends LazyLogging {
     require(socket != null)
-    require(host != null)
     require(soTimeout >= 0)
 
     socket.setSoTimeout(soTimeout)
@@ -153,5 +152,5 @@ object NCSocket {
      * @return
      */
     def apply(host: String, port: Integer): NCSocket =
-        new NCSocket(new Socket(host, port), host)
+        new NCSocket(new Socket(host, port))
 }
\ No newline at end of file
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/model/tools/cmdline/NCCommandLine.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/model/tools/cmdline/NCCommandLine.scala
index 1593ff2..540bce6 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/model/tools/cmdline/NCCommandLine.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/model/tools/cmdline/NCCommandLine.scala
@@ -148,53 +148,9 @@ object NCCommandLine extends App {
             names = Seq("stop-server"),
             synopsis = s"Stops local REST server.",
             desc = Some(
-                s"Local REST server must be started via $SCRIPT_NAME or ."
+                s"Local REST server must be started via $SCRIPT_NAME or similar way."
             ),
-            body = cmdStartServer,
-            params = Seq(
-                Parameter(
-                    id = "config",
-                    names = Seq("--config", "-c"),
-                    valueDesc = Some("{path}"),
-                    optional = true,
-                    desc =
-                        "Configuration absolute file path. Server will automatically look for 'nlpcraft.conf' " +
-                            "configuration file in the same directory as NLPCraft JAR file. If the configuration file has " +
-                            "different name or in different location use this parameter to provide an alternative path. " +
-                            "Note that the REST server and the data probe can use the same file for their configuration."
-                ),
-                Parameter(
-                    id = "igniteConfig",
-                    names = Seq("--ignite-config", "-i"),
-                    valueDesc = Some("{path}"),
-                    optional = true,
-                    desc =
-                        "Apache Ignite configuration absolute file path. Note that Apache Ignite is used as a cluster " +
-                            "computing plane and a default distributed storage. REST server will automatically look for " +
-                            "'ignite.xml' configuration file in the same directory as NLPCraft JAR file. If the " +
-                            "configuration file has different name or in different location use this parameter to " +
-                            "provide an alternative path."
-                ),
-                Parameter(
-                    id = "outputPath",
-                    names = Seq("--output-path", "-o"),
-                    valueDesc = Some("{path}"),
-                    optional = true,
-                    desc =
-                        "File path for both REST server stdout and stderr output. If not provided, the REST server" +
-                            s"output will be piped into '$${USER_HOME}/.nlpcraft/server-output.txt' file."
-                )
-            ),
-            examples = Seq(
-                Example(
-                    code = s"$$ $SCRIPT_NAME start-server",
-                    desc = "Starts REST server with default configuration."
-                ),
-                Example(
-                    code = s"$$ $SCRIPT_NAME start-server -c=/opt/nlpcraft/nlpcraft.conf",
-                    desc = "Starts REST server with alternative configuration file."
-                )
-            )
+            body = cmdStopServer
         ),
         Command(
             id = "help",
@@ -305,6 +261,16 @@ object NCCommandLine extends App {
      * @param cmd Command descriptor.
      * @param params Parameters, if any, for this command.
      */
+    private def cmdStopServer(cmd: Command, params: Seq[String]): Unit = {
+        title()
+
+        // TODO
+    }
+
+    /**
+     * @param cmd Command descriptor.
+     * @param params Parameters, if any, for this command.
+     */
     private def cmdHelp(cmd: Command, params: Seq[String]): Unit = {
         title()
 
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/server/NCServer.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/server/NCServer.scala
index 83bae94..53927f1 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/server/NCServer.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/server/NCServer.scala
@@ -125,7 +125,7 @@ object NCServer extends App with NCIgniteInstance with LazyLogging with NCOpenCe
                 try
                     p.stop(span)
                 catch {
-                    case e: Exception ⇒ U.prettyError(logger, "Error stopping managers:", e)
+                    case e: Exception ⇒ U.prettyError(logger, s"Error stopping manager: ${p.name}", e)
                 }
             )
         }
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/server/ctrl/NCControlManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/server/ctrl/NCControlManager.scala
new file mode 100644
index 0000000..8984867
--- /dev/null
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/server/ctrl/NCControlManager.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nlpcraft.server.ctrl
+
+import java.io.{EOFException, InterruptedIOException}
+import java.net.{InetSocketAddress, ServerSocket, Socket}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import io.opencensus.trace.Span
+import org.apache.nlpcraft.common._
+import org.apache.nlpcraft.common.socket.NCSocket
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+/**
+ * Local host control protocol (used by `nlpcraft.{sh|cmd}` script.
+ */
+object NCControlManager extends NCService {
+    private final val PORT = 43011
+    private final val LOCALHOST = "127.0.0.1"
+    private final val SO_TIMEOUT = 5000
+
+    @volatile private var srvThread: Thread = _
+    @volatile private var isStopping: AtomicBoolean = _
+
+    override def stop(parent: Span): Unit =
+        startScopedSpan("start", parent) { _ ⇒
+            isStopping = new AtomicBoolean(true)
+
+            U.stopThread(srvThread)
+
+            super.stop()
+        }
+
+    override def start(parent: Span = null): NCService =
+        startScopedSpan("start", parent, "endpoint" → s"$LOCALHOST:$PORT") { _ ⇒
+            isStopping = new AtomicBoolean(false)
+
+            srvThread = startServer()
+
+            srvThread.start()
+
+            this
+        }
+
+    /**
+     *
+     * @param sock
+     */
+    private def processSocket(sock: NCSocket): Unit = {
+
+    }
+
+    /**
+     *
+     */
+    private def startServer(): Thread = {
+        new Thread(s"ctrl-mgr") {
+            private final val thName = getName
+            private var srv: ServerSocket = _
+            @volatile private var stopped = false
+
+            override def isInterrupted: Boolean =
+                super.isInterrupted || stopped
+
+            override def interrupt(): Unit = {
+                super.interrupt()
+
+                U.close(srv)
+
+                stopped = true
+            }
+
+            override def run(): Unit = {
+                try {
+                    body()
+                }
+                catch {
+                    case _: InterruptedException ⇒ logger.trace(s"Thread interrupted: $thName")
+                    case e: Throwable ⇒
+                        U.prettyError(
+                            logger,
+                            s"Unexpected error during '$thName' thread execution:",
+                            e
+                        )
+                }
+                finally
+                    stopped = true
+            }
+
+            private def body(): Unit =
+                while (!isInterrupted)
+                    try {
+                        srv = new ServerSocket()
+
+                        srv.bind(new InetSocketAddress(LOCALHOST, PORT))
+                        srv.setSoTimeout(SO_TIMEOUT)
+
+                        logger.info(s"Control server is listening on '$LOCALHOST:$PORT'")
+
+                        while (!isInterrupted) {
+                            var sock: Socket = null
+
+                            try {
+                                sock = srv.accept()
+
+                                logger.trace(s"Control server accepted new connection from: ${sock.getRemoteSocketAddress}")
+                            }
+                            catch {
+                                case _: InterruptedIOException ⇒ // No-op.
+
+                                // Note that server socket must be closed and created again.
+                                // So, error should be thrown.
+                                case e: Exception ⇒
+                                    U.close(sock)
+
+                                    throw e
+                            }
+
+                            if (sock != null) {
+                                val fut = Future {
+                                    processSocket(NCSocket(sock))
+                                }
+
+                                fut.onComplete {
+                                    case Success(_) ⇒ // No-op.
+
+                                    case Failure(e: NCE) ⇒ logger.warn(e.getMessage, e)
+                                    case Failure(_: EOFException) ⇒ () // Just ignoring.
+                                    case Failure(e: Throwable) ⇒ logger.warn(s"Ignoring socket error: ${e.getLocalizedMessage}")
+                                }
+                            }
+                        }
+                    }
+                    catch {
+                        case e: Exception ⇒
+                            if (!isStopping.get) {
+                                // Release socket asap.
+                                U.close(srv)
+
+                                val ms = Config.reconnectTimeoutMs
+
+                                // Server socket error must be logged.
+                                logger.warn(s"$name server error, re-starting in ${ms / 1000} sec.", e)
+
+                                U.sleep(ms)
+                            }
+                    }
+                    finally {
+                        U.close(srv)
+                    }
+        }
+    }
+}
diff --git a/nlpcraft/src/main/scala/org/apache/nlpcraft/server/probe/NCProbeManager.scala b/nlpcraft/src/main/scala/org/apache/nlpcraft/server/probe/NCProbeManager.scala
index 21f225b..fea92e2 100644
--- a/nlpcraft/src/main/scala/org/apache/nlpcraft/server/probe/NCProbeManager.scala
+++ b/nlpcraft/src/main/scala/org/apache/nlpcraft/server/probe/NCProbeManager.scala
@@ -23,7 +23,7 @@ import java.security.Key
 import java.util
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Executors}
+import java.util.concurrent.ConcurrentHashMap
 
 import com.google.gson.Gson
 import com.google.gson.reflect.TypeToken
@@ -58,13 +58,12 @@ object NCProbeManager extends NCService {
     private final val TYPE_MODEL_INFO_RESP = new TypeToken[util.HashMap[String, AnyRef]]() {}.getType
 
     // Type safe and eager configuration container.
-    private[probe] object Config extends NCConfigurable {
+    private object Config extends NCConfigurable {
         final private val pre = "nlpcraft.server.probe"
 
         def getDnHostPort: (String, Integer) = getHostPort(s"$pre.links.downLink")
         def getUpHostPort: (String, Integer) = getHostPort(s"$pre.links.upLink")
 
-        def poolSize: Int = getInt(s"$pre.poolSize")
         def reconnectTimeoutMs: Long = getLong(s"$pre.reconnectTimeoutMs")
         def pingTimeoutMs: Long = getLong(s"$pre.pingTimeoutMs")
         def soTimeoutMs: Int = getInt(s"$pre.soTimeoutMs")
@@ -73,8 +72,8 @@ object NCProbeManager extends NCService {
           *
           */
         def check(): Unit = {
-            val (_, dnPort) =  getDnHostPort
-            val (_, upPort) =  getUpHostPort
+            val (_, dnPort) = getDnHostPort
+            val (_, upPort) = getUpHostPort
 
             val msg1 = "Configuration property must be >= 0 and <= 65535"
             val msg2 = "Configuration property must be > 0"
@@ -94,11 +93,6 @@ object NCProbeManager extends NCService {
                     s"name=$pre.reconnectTimeoutMs, " +
                     s"value=$reconnectTimeoutMs" +
                 s"]")
-            if (poolSize <= 0)
-                throw new NCE(s"$msg2 [" +
-                    s"name=$pre.poolSize, " +
-                    s"value=$poolSize" +
-                s"]")
             if (soTimeoutMs <= 0)
                 throw new NCE(s"$msg2 [" +
                     s"name=$pre.soTimeoutMs, " +
@@ -124,7 +118,7 @@ object NCProbeManager extends NCService {
             s"probeId=$probeId, " +
             s"probeGuid=$probeGuid, " +
             s"probeToken=$probeToken" +
-            s"]"
+        s"]"
 
         def short: String = s"$probeId (guid:$probeGuid, tok:$probeToken)"
     }
@@ -164,7 +158,6 @@ object NCProbeManager extends NCService {
     // All probes pending complete handshake keyed by probe key.
     @volatile private var pending: mutable.Map[ProbeKey, ProbeHolder] = _
 
-    @volatile private var pool: ExecutorService = _
     @volatile private var isStopping: AtomicBoolean = _
 
     @volatile private var modelsInfo: ConcurrentHashMap[String, Promise[java.util.Map[String, AnyRef]]] = _
@@ -189,9 +182,7 @@ object NCProbeManager extends NCService {
         isStopping = new AtomicBoolean(false)
 
         modelsInfo = new ConcurrentHashMap[String, Promise[java.util.Map[String, AnyRef]]]()
-        
-        pool = Executors.newFixedThreadPool(Config.poolSize)
-        
+
         dnSrv = startServer("Downlink", dnHost, dnPort, downLinkHandler)
         upSrv = startServer("Uplink", upHost, upPort, upLinkHandler)
         
@@ -221,9 +212,7 @@ object NCProbeManager extends NCService {
       */
     override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
         isStopping = new AtomicBoolean(true)
-     
-        U.shutdownPools(pool)
-     
+
         U.stopThread(pingSrv)
         U.stopThread(dnSrv)
         U.stopThread(upSrv)
@@ -392,9 +381,10 @@ object NCProbeManager extends NCService {
                                     
                                     throw e
                             }
+
                             if (sock != null) {
                                 val fut = Future {
-                                    fn(NCSocket(sock, sock.getRemoteSocketAddress.toString))
+                                    fn(NCSocket(sock))
                                 }
 
                                 fut.onComplete {