You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by da...@apache.org on 2016/01/18 11:10:36 UTC
incubator-s2graph git commit: [S2GRAPH-9] Provide rest server using
netty
Repository: incubator-s2graph
Updated Branches:
refs/heads/master 66be5c8c3 -> 2b63878cc
[S2GRAPH-9] Provide rest server using netty
Move common codes for rest into s2core.
Move test cases into s2core.
Add Netty server.
JIRA:
[S2GRAPH-9] https://issues.apache.org/jira/browse/S2GRAPH-9
Pull Request:
Closes #8
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2b63878c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2b63878c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2b63878c
Branch: refs/heads/master
Commit: 2b63878cc3850b8308fb123c824870066efbef09
Parents: 66be5c8
Author: daewon <da...@apache.org>
Authored: Mon Jan 18 17:37:13 2016 +0900
Committer: daewon <da...@apache.org>
Committed: Mon Jan 18 17:37:13 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
build.sbt | 4 +
s2core/src/main/resources/reference.conf | 22 +-
.../com/kakao/s2graph/core/PostProcess.scala | 1 -
.../kakao/s2graph/core/mysqls/Experiment.scala | 5 +-
.../kakao/s2graph/core/rest/RequestParser.scala | 2 +-
.../kakao/s2graph/core/rest/RestCaller.scala | 183 -----------
.../kakao/s2graph/core/rest/RestHandler.scala | 199 ++++++++++++
.../kakao/s2graph/core/utils/Extentions.scala | 7 +
.../s2graph/core/parsers/WhereParserTest.scala | 2 +
s2rest_netty/src/main/scala/Server.scala | 200 ++++++++++++
s2rest_play/app/Bootstrap.scala | 4 +-
.../app/controllers/ApplicationController.scala | 17 +
.../app/controllers/ExperimentController.scala | 107 +------
.../app/controllers/QueryController.scala | 311 ++-----------------
15 files changed, 489 insertions(+), 577 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 188d194..f9c6964 100644
--- a/CHANGES
+++ b/CHANGES
@@ -18,4 +18,6 @@ Release 0.12.1 - unreleased
SUB TASKS
+ S2GRAPH-9: Provide rest server using netty. (Committed by daewon).
+
S2GRAPH-7: Abstract common codes for rest project into s2core. (Committed by daewon).
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 30e5b64..5efa284 100755
--- a/build.sbt
+++ b/build.sbt
@@ -26,6 +26,10 @@ lazy val s2rest_play = project.enablePlugins(PlayScala)
.settings(commonSettings: _*)
.settings(testOptions in Test += Tests.Argument("sequential"))
+lazy val s2rest_netty = project
+ .dependsOn(s2core)
+ .settings(commonSettings: _*)
+
lazy val s2core = project.settings(commonSettings: _*)
lazy val spark = project.settings(commonSettings: _*)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/reference.conf b/s2core/src/main/resources/reference.conf
index b527f7f..cef0284 100644
--- a/s2core/src/main/resources/reference.conf
+++ b/s2core/src/main/resources/reference.conf
@@ -2,8 +2,28 @@
phase=dev
host=localhost
+# Hbase
+hbase.table.compression.algorithm="gz"
hbase.zookeeper.quorum=${host}
+# Asynchbase
+hbase.client.retries.number=100
+hbase.rpcs.buffered_flush_interval=100
+hbase.rpc.timeout=0
+
+# local retry number
+max.retry.number=100
+max.back.off=50
+
+# Future cache.
+future.cache.max.size=100000
+future.cache.expire.after.write=10000
+future.cache.expire.after.access=5000
+
+# Local Cache
+cache.ttl.seconds=60
+cache.max.size=100000
+
# DB
s2graph.models.table.name="models-dev"
db.default.driver="com.mysql.jdbc.Driver"
@@ -11,8 +31,6 @@ db.default.url="jdbc:mysql://"${host}":3306/graph_dev"
db.default.user="graph"
db.default.password="graph"
-cache.max.size=100000
-
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
index 0a26d26..56d8591 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
@@ -323,7 +323,6 @@ object PostProcess extends JSONParser {
to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType)
} yield {
val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props"
-
val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam)
val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
index 93cc374..d484914 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
@@ -5,10 +5,9 @@ import scalikejdbc._
import scala.util.Random
-/**
- * Created by shon on 8/5/15.
- */
object Experiment extends Model[Experiment] {
+ val impressionKey = "S2-Impression-Id"
+
def apply(rs: WrappedResultSet): Experiment = {
Experiment(rs.intOpt("id"),
rs.int("service_id"),
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
index 98cc68b..78fc375 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
@@ -6,6 +6,7 @@ import com.kakao.s2graph.core.mysqls._
import com.kakao.s2graph.core.parsers.WhereParser
import com.kakao.s2graph.core.types._
import com.typesafe.config.Config
+
import play.api.libs.json._
import scala.util.{Failure, Success, Try}
@@ -482,7 +483,6 @@ class RequestParser(config: Config) extends JSONParser {
}
(src, tgt, QueryParam(LabelWithDirection(label.id.get, dir)))
}
-
(quads, isReverted)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala
deleted file mode 100644
index bef1dec..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-package com.kakao.s2graph.core.rest
-
-import java.net.URL
-
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service}
-import com.kakao.s2graph.core.utils.logger
-import play.api.libs.json._
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.Try
-
-/**
- * Public API only return Future.successful or Future.failed
- * Don't throw exception
- */
-class RestCaller(graph: Graph)(implicit ec: ExecutionContext) {
- val s2Parser = new RequestParser(graph.config)
-
- /**
- * Public APIS
- */
- def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String): Future[(JsValue, String)] = {
- try {
- val bucketOpt = for {
- service <- Service.findByAccessToken(accessToken)
- experiment <- Experiment.findBy(service.id.get, experimentName)
- bucket <- experiment.findBucket(uuid)
- } yield bucket
-
- val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found"))
- if (bucket.isGraphQuery) buildRequestInner(contentsBody, bucket, uuid).map(_ -> bucket.impressionId)
- else throw new RuntimeException("not supported yet")
- } catch {
- case e: Exception => Future.failed(e)
- }
- }
-
- def uriMatch(uri: String, jsQuery: JsValue): Future[JsValue] = {
- try {
- uri match {
- case "/graphs/getEdges" => getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)
- case "/graphs/getEdges/grouped" => getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted)
- case "/graphs/getEdgesExcluded" => getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)
- case "/graphs/getEdgesExcluded/grouped" => getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)
- case "/graphs/checkEdges" => checkEdges(jsQuery)
- case "/graphs/getEdgesGrouped" => getEdgesAsync(jsQuery)(PostProcess.summarizeWithList)
- case "/graphs/getEdgesGroupedExcluded" => getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude)
- case "/graphs/getEdgesGroupedExcludedFormatted" => getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)
- case "/graphs/getVertices" => getVertices(jsQuery)
- case _ => throw new RuntimeException("route is not found")
- }
- } catch {
- case e: Exception => Future.failed(e)
- }
- }
-
- def checkEdges(jsValue: JsValue): Future[JsValue] = {
- try {
- val (quads, isReverted) = s2Parser.toCheckEdgeParam(jsValue)
-
- graph.checkEdges(quads).map { case queryRequestWithResultLs =>
- val edgeJsons = for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- convertedEdge = if (isReverted) edge.duplicateEdge else edge
- edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
- } yield Json.toJson(edgeJson)
-
- Json.toJson(edgeJsons)
- }
- } catch {
- case e: Exception => Future.failed(e)
- }
- }
-
- /**
- * Private APIS
- */
- private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
- val filterOutQueryResultsLs = q.filterOutQuery match {
- case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
- case None => Future.successful(Seq.empty)
- }
-
- for {
- queryResultsLs <- graph.getEdges(q)
- filterOutResultsLs <- filterOutQueryResultsLs
- } yield {
- val json = post(queryResultsLs, filterOutResultsLs)
- json
- }
- }
-
- private def getEdgesAsync(jsonQuery: JsValue)
- (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-
- val fetch = eachQuery(post) _
- jsonQuery match {
- case JsArray(arr) => Future.traverse(arr.map(s2Parser.toQuery(_)))(fetch).map(JsArray)
- case obj@JsObject(_) => fetch(s2Parser.toQuery(obj))
- case _ => throw BadQueryException("Cannot support")
- }
- }
-
- private def getEdgesExcludedAsync(jsonQuery: JsValue)
- (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
- val q = s2Parser.toQuery(jsonQuery)
- val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
-
- val fetchFuture = graph.getEdges(q)
- val excludeFuture = graph.getEdges(filterOutQuery)
-
- for {
- queryResultLs <- fetchFuture
- exclude <- excludeFuture
- } yield {
- post(queryResultLs, exclude)
- }
- }
-
- private def getVertices(jsValue: JsValue) = {
- val jsonQuery = jsValue
- val ts = System.currentTimeMillis()
- val props = "{}"
-
- val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
- val serviceName = (js \ "serviceName").as[String]
- val columnName = (js \ "columnName").as[String]
- for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
- Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
- }
- }
-
- graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) }
- }
-
-
- private def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = {
- var body = bucket.requestBody.replace("#uuid", uuid)
- for {
- requestKeyJson <- requestKeyJsonOpt
- jsObj <- requestKeyJson.asOpt[JsObject]
- (key, value) <- jsObj.fieldSet
- } {
- val replacement = value match {
- case JsString(s) => s
- case _ => value.toString
- }
- body = body.replace(key, replacement)
- }
-
- Try(Json.parse(body)).recover {
- case e: Exception =>
- throw new BadQueryException(s"wrong or missing template parameter: ${e.getMessage.takeWhile(_ != '\n')}")
- } get
- }
-
- private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: String): Future[JsValue] = {
- if (bucket.isEmpty) Future.successful(PostProcess.emptyResults)
- else {
- val jsonBody = makeRequestJson(Option(contentsBody), bucket, uuid)
- val url = new URL(bucket.apiPath)
- val path = url.getPath()
-
- // dummy log for sampling
- val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
-
- logger.info(experimentLog)
-
- uriMatch(path, jsonBody)
- }
- }
-
- def calcSize(js: JsValue): Int = js match {
- case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
- case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum
- case _ => 0
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
new file mode 100644
index 0000000..9647314
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
@@ -0,0 +1,199 @@
+package com.kakao.s2graph.core.rest
+
+import java.net.URL
+
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import com.kakao.s2graph.core.utils.logger
+import play.api.libs.json._
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
+
+
+object RestHandler {
+ case class HandlerResult(body: Future[JsValue], headers: (String, String)*)
+}
+
+/**
+ * Public API, only return Future.successful or Future.failed
+ * Don't throw exception
+ */
+class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
+
+ import RestHandler._
+
+ val s2Parser = new RequestParser(graph.config)
+
+ /**
+ * Public APIS
+ */
+ def doPost(uri: String, jsQuery: JsValue): HandlerResult = {
+ try {
+ uri match {
+ case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+ case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
+ case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+ case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+ case "/graphs/checkEdges" => checkEdges(jsQuery)
+ case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
+ case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
+ case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+ case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
+ case uri if uri.startsWith("/graphs/experiment") =>
+ val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3)
+ experiment(jsQuery, accessToken, experimentName, uuid)
+ case _ => throw new RuntimeException("route is not found")
+ }
+ } catch {
+ case e: Exception => HandlerResult(Future.failed(e))
+ }
+ }
+
+ // TODO: Refactor to doGet
+ def checkEdges(jsValue: JsValue): HandlerResult = {
+ try {
+ val (quads, isReverted) = s2Parser.toCheckEdgeParam(jsValue)
+
+ HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs =>
+ val edgeJsons = for {
+ queryRequestWithResult <- queryRequestWithResultLs
+ (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+ edgeWithScore <- queryResult.edgeWithScoreLs
+ (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+ convertedEdge = if (isReverted) edge.duplicateEdge else edge
+ edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
+ } yield Json.toJson(edgeJson)
+
+ Json.toJson(edgeJsons)
+ })
+ } catch {
+ case e: Exception => HandlerResult(Future.failed(e))
+ }
+ }
+
+
+ /**
+ * Private APIS
+ */
+ private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String): HandlerResult = {
+ try {
+ val bucketOpt = for {
+ service <- Service.findByAccessToken(accessToken)
+ experiment <- Experiment.findBy(service.id.get, experimentName)
+ bucket <- experiment.findBucket(uuid)
+ } yield bucket
+
+ val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found"))
+ if (bucket.isGraphQuery) {
+ val ret = buildRequestInner(contentsBody, bucket, uuid)
+ HandlerResult(ret.body, Experiment.impressionKey -> bucket.impressionId)
+ }
+ else throw new RuntimeException("not supported yet")
+ } catch {
+ case e: Exception => HandlerResult(Future.failed(e))
+ }
+ }
+
+ private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: String): HandlerResult = {
+ if (bucket.isEmpty) HandlerResult(Future.successful(PostProcess.emptyResults))
+ else {
+ val jsonBody = makeRequestJson(Option(contentsBody), bucket, uuid)
+ val url = new URL(bucket.apiPath)
+ val path = url.getPath()
+
+ // dummy log for sampling
+ val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
+
+ logger.info(experimentLog)
+
+ doPost(path, jsonBody)
+ }
+ }
+
+ private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
+ val filterOutQueryResultsLs = q.filterOutQuery match {
+ case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
+ case None => Future.successful(Seq.empty)
+ }
+
+ for {
+ queryResultsLs <- graph.getEdges(q)
+ filterOutResultsLs <- filterOutQueryResultsLs
+ } yield {
+ val json = post(queryResultsLs, filterOutResultsLs)
+ json
+ }
+ }
+
+ private def getEdgesAsync(jsonQuery: JsValue)
+ (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
+
+ val fetch = eachQuery(post) _
+ jsonQuery match {
+ case JsArray(arr) => Future.traverse(arr.map(s2Parser.toQuery(_)))(fetch).map(JsArray)
+ case obj@JsObject(_) => fetch(s2Parser.toQuery(obj))
+ case _ => throw BadQueryException("Cannot support")
+ }
+ }
+
+ private def getEdgesExcludedAsync(jsonQuery: JsValue)
+ (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
+ val q = s2Parser.toQuery(jsonQuery)
+ val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
+
+ val fetchFuture = graph.getEdges(q)
+ val excludeFuture = graph.getEdges(filterOutQuery)
+
+ for {
+ queryResultLs <- fetchFuture
+ exclude <- excludeFuture
+ } yield {
+ post(queryResultLs, exclude)
+ }
+ }
+
+ private def getVertices(jsValue: JsValue) = {
+ val jsonQuery = jsValue
+ val ts = System.currentTimeMillis()
+ val props = "{}"
+
+ val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
+ val serviceName = (js \ "serviceName").as[String]
+ val columnName = (js \ "columnName").as[String]
+ for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
+ Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
+ }
+ }
+
+ graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) }
+ }
+
+
+ private def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = {
+ var body = bucket.requestBody.replace("#uuid", uuid)
+ for {
+ requestKeyJson <- requestKeyJsonOpt
+ jsObj <- requestKeyJson.asOpt[JsObject]
+ (key, value) <- jsObj.fieldSet
+ } {
+ val replacement = value match {
+ case JsString(s) => s
+ case _ => value.toString
+ }
+ body = body.replace(key, replacement)
+ }
+
+ Try(Json.parse(body)).recover {
+ case e: Exception =>
+ throw new BadQueryException(s"wrong or missing template parameter: ${e.getMessage.takeWhile(_ != '\n')}")
+ } get
+ }
+
+ def calcSize(js: JsValue): Int = js match {
+ case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
+ case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum
+ case _ => 0
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
index 3812c2c..4858f60 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
@@ -1,6 +1,8 @@
package com.kakao.s2graph.core.utils
import com.stumbleupon.async.{Callback, Deferred}
+import com.typesafe.config.Config
+
import scala.concurrent.{ExecutionContext, Future, Promise}
object Extensions {
@@ -64,4 +66,9 @@ object Extensions {
}
+ implicit class ConfigOps(config: Config) {
+ def getBooleanWithFallback(key: String, defaultValue: Boolean): Boolean =
+ if (config.hasPath(key)) config.getBoolean(key) else defaultValue
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
index 659983c..de19a75 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
@@ -7,6 +7,8 @@ import org.scalatest.{FunSuite, Matchers}
import play.api.libs.json.Json
class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels {
+ initTests()
+
// dummy data for dummy edge
initTests()
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_netty/src/main/scala/Server.scala
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/scala/Server.scala b/s2rest_netty/src/main/scala/Server.scala
new file mode 100644
index 0000000..7290b8a
--- /dev/null
+++ b/s2rest_netty/src/main/scala/Server.scala
@@ -0,0 +1,200 @@
+package com.kakao.s2graph.rest.netty
+
+import java.util.concurrent.Executors
+
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.rest.RestHandler.HandlerResult
+import com.kakao.s2graph.core.rest._
+import com.kakao.s2graph.core.utils.Extensions._
+import com.kakao.s2graph.core.utils.logger
+import com.typesafe.config.ConfigFactory
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.{ByteBuf, Unpooled}
+import io.netty.channel._
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.socket.SocketChannel
+import io.netty.channel.socket.nio.NioServerSocketChannel
+import io.netty.handler.codec.http.HttpHeaders._
+import io.netty.handler.codec.http._
+import io.netty.handler.logging.{LogLevel, LoggingHandler}
+import io.netty.util.CharsetUtil
+import play.api.libs.json._
+
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+import scala.io.Source
+import scala.util.{Failure, Success, Try}
+
+class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] with JSONParser {
+ val ApplicationJson = "application/json"
+
+ val Ok = HttpResponseStatus.OK
+ val CloseOpt = Option(ChannelFutureListener.CLOSE)
+ val BadRequest = HttpResponseStatus.BAD_REQUEST
+ val BadGateway = HttpResponseStatus.BAD_GATEWAY
+ val NotFound = HttpResponseStatus.NOT_FOUND
+ val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR
+
+ def badRoute(ctx: ChannelHandlerContext) =
+ simpleResponse(ctx, BadGateway, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+
+ def simpleResponse(ctx: ChannelHandlerContext,
+ httpResponseStatus: HttpResponseStatus,
+ byteBufOpt: Option[ByteBuf] = None,
+ headers: Seq[(String, String)] = Nil,
+ channelFutureListenerOpt: Option[ChannelFutureListener] = None): Unit = {
+
+ val res: FullHttpResponse = byteBufOpt match {
+ case None => new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus)
+ case Some(byteBuf) =>
+ new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, byteBuf)
+ }
+
+ headers.foreach { case (k, v) => res.headers().set(k, v) }
+ val channelFuture = ctx.writeAndFlush(res)
+
+ channelFutureListenerOpt match {
+ case None =>
+ case Some(listener) => channelFuture.addListener(listener)
+ }
+ }
+
+ def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: JsValue, result: HandlerResult, startedAt: Long) = {
+ var closeOpt = CloseOpt
+ var headers = mutable.ArrayBuilder.make[(String, String)]
+
+ headers += (Names.CONTENT_TYPE -> ApplicationJson)
+ result.headers.foreach(headers += _)
+
+ if (HttpHeaders.isKeepAlive(req)) {
+ headers += (Names.CONNECTION -> HttpHeaders.Values.KEEP_ALIVE)
+ closeOpt = None
+ }
+
+ result.body onComplete {
+ case Success(json) =>
+ val duration = System.currentTimeMillis() - startedAt
+
+ val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 ${s2rest.calcSize(json)} ${requestBody}"
+ logger.info(log)
+
+ val buf: ByteBuf = Unpooled.copiedBuffer(json.toString, CharsetUtil.UTF_8)
+
+ headers += (Names.CONTENT_LENGTH -> buf.readableBytes().toString)
+
+ simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result())
+ case Failure(ex) => ex match {
+ case e: BadQueryException =>
+ logger.error(s"{$requestBody}, ${e.getMessage}", e)
+ val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, CharsetUtil.UTF_8)
+ simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result())
+ case e: Exception =>
+ logger.error(s"${requestBody}, ${e.getMessage}", e)
+ val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8)
+ simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result())
+ }
+ }
+ }
+
+ override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = {
+ val uri = req.getUri
+ val startedAt = System.currentTimeMillis()
+
+ req.getMethod match {
+ case HttpMethod.GET =>
+ uri match {
+ case "/health_check.html" =>
+ if (NettyServer.isHealthy) {
+ val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8)
+ simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt)
+ } else {
+ simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt)
+ }
+
+ case s if s.startsWith("/graphs/getEdge/") =>
+ // src, tgt, label, dir
+ val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4)
+ val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
+ val result = s2rest.checkEdges(params)
+ toResponse(ctx, req, params, result, startedAt)
+ case _ => badRoute(ctx)
+ }
+
+ case HttpMethod.PUT =>
+ if (uri.startsWith("/health_check/")) {
+ val newHealthCheck = uri.split("/").last.toBoolean
+ NettyServer.isHealthy = newHealthCheck
+ val newHealthCheckMsg = Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8)
+ simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt)
+ } else badRoute(ctx)
+
+ case HttpMethod.POST =>
+ val jsonString = req.content.toString(CharsetUtil.UTF_8)
+ val jsQuery = Json.parse(jsonString)
+
+ val result = s2rest.doPost(uri, jsQuery)
+ toResponse(ctx, req, jsQuery, result, startedAt)
+
+ case _ =>
+ simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+ }
+ }
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
+ cause.printStackTrace()
+ logger.error(s"exception on query.", cause)
+ simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+ }
+}
+
+// Simple http server
+object NettyServer extends App {
+ /** should be same with Boostrap.onStart on play */
+
+ val numOfThread = Runtime.getRuntime.availableProcessors()
+ val threadPool = Executors.newFixedThreadPool(numOfThread)
+ val ec = ExecutionContext.fromExecutor(threadPool)
+
+ val config = ConfigFactory.load()
+ val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get
+
+ // init s2graph with config
+ val s2graph = new Graph(config)(ec)
+ val rest = new RestHandler(s2graph)(ec)
+
+ val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get
+ var isHealthy = config.getBooleanWithFallback("app.health.on", true)
+
+ logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}")
+
+ // Configure the server.
+ val bossGroup: EventLoopGroup = new NioEventLoopGroup(1)
+ val workerGroup: EventLoopGroup = new NioEventLoopGroup()
+
+ try {
+ val b: ServerBootstrap = new ServerBootstrap()
+ b.option(ChannelOption.SO_BACKLOG, Int.box(2048))
+
+ b.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel])
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new ChannelInitializer[SocketChannel] {
+ override def initChannel(ch: SocketChannel) {
+ val p = ch.pipeline()
+ p.addLast(new HttpServerCodec())
+ p.addLast(new HttpObjectAggregator(65536))
+ p.addLast(new S2RestHandler(rest)(ec))
+ }
+ })
+
+ logger.info(s"Listening for HTTP on /0.0.0.0:$port")
+ val ch: Channel = b.bind(port).sync().channel()
+ ch.closeFuture().sync()
+
+ } finally {
+ bossGroup.shutdownGracefully()
+ workerGroup.shutdownGracefully()
+ s2graph.shutdown()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_play/app/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala
index a1acb10..8005f79 100644
--- a/s2rest_play/app/Bootstrap.scala
+++ b/s2rest_play/app/Bootstrap.scala
@@ -3,7 +3,7 @@ package com.kakao.s2graph.rest
import java.util.concurrent.Executors
import actors.QueueActor
-import com.kakao.s2graph.core.rest.RequestParser
+import com.kakao.s2graph.core.rest._
import com.kakao.s2graph.core.utils.logger
import com.kakao.s2graph.core.{Management, ExceptionHandler, Graph}
import config.Config
@@ -20,6 +20,7 @@ object Global extends WithFilters(new GzipFilter()) {
var s2graph: Graph = _
var storageManagement: Management = _
var s2parser: RequestParser = _
+ var s2rest: RestHandler = _
// Application entry point
override def onStart(app: Application) {
@@ -35,6 +36,7 @@ object Global extends WithFilters(new GzipFilter()) {
s2graph = new Graph(config)(ec)
storageManagement = new Management(s2graph)
s2parser = new RequestParser(s2graph.config) // merged config
+ s2rest = new RestHandler(s2graph)(ec)
QueueActor.init(s2graph)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_play/app/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ApplicationController.scala b/s2rest_play/app/controllers/ApplicationController.scala
index fefe93e..d9384d9 100644
--- a/s2rest_play/app/controllers/ApplicationController.scala
+++ b/s2rest_play/app/controllers/ApplicationController.scala
@@ -1,5 +1,7 @@
package controllers
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core.PostProcess
import com.kakao.s2graph.core.utils.logger
import play.api.libs.iteratee.Enumerator
import play.api.libs.json.{JsString, JsValue}
@@ -15,6 +17,21 @@ object ApplicationController extends Controller {
val jsonParser: BodyParser[JsValue] = controllers.s2parse.json
+ private def badQueryExceptionResults(ex: Exception) =
+ Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader))
+
+ private def errorResults =
+ Future.successful(Ok(PostProcess.emptyResults).as(applicationJsonHeader))
+
+ def requestFallback(body: JsValue): PartialFunction[Throwable, Future[Result]] = {
+ case e: BadQueryException =>
+ logger.error(s"{$body}, ${e.getMessage}", e)
+ badQueryExceptionResults(e)
+ case e: Exception =>
+ logger.error(s"${body}, ${e.getMessage}", e)
+ errorResults
+ }
+
def updateHealthCheck(isHealthy: Boolean) = Action { request =>
this.isHealthy = isHealthy
Ok(this.isHealthy + "\n")
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_play/app/controllers/ExperimentController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ExperimentController.scala b/s2rest_play/app/controllers/ExperimentController.scala
index 5ae2379..40f67d1 100644
--- a/s2rest_play/app/controllers/ExperimentController.scala
+++ b/s2rest_play/app/controllers/ExperimentController.scala
@@ -1,114 +1,23 @@
package controllers
-import java.net.URL
-
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.rest.RequestParser
-import com.kakao.s2graph.core.utils.logger
-import play.api.Play.current
-import play.api.libs.json.{JsObject, JsString, JsValue, Json}
-import play.api.libs.ws.WS
+import com.kakao.s2graph.core.rest.RestHandler
import play.api.mvc._
import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
object ExperimentController extends Controller {
- val impressionKey = "S2-Impression-Id"
+ private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest
import ApplicationController._
def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(parse.anyContent) { request =>
- val bucketOpt = for {
- service <- Service.findByAccessToken(accessToken)
- experiment <- Experiment.findBy(service.id.get, experimentName)
- bucket <- experiment.findBucket(uuid)
- } yield bucket
-
- bucketOpt match {
- case None => Future.successful(NotFound("bucket is not found."))
- case Some(bucket) =>
- try {
- if (bucket.isGraphQuery) buildRequestInner(request, bucket, uuid)
- else buildRequest(request, bucket, uuid)
- } catch {
- case e: Exception =>
- logger.error(e.toString())
- Future.successful(BadRequest(s"wrong or missing template parameter: ${e.getMessage}"))
- }
- }
- }
-
- def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = {
- var body = bucket.requestBody.replace("#uuid", uuid)
-
- for {
- requestKeyJson <- requestKeyJsonOpt
- jsObj <- requestKeyJson.asOpt[JsObject]
- (key, value) <- jsObj.fieldSet
- } {
- val replacement = value match {
- case JsString(s) => s
- case _ => value.toString
- }
- body = body.replace(key, replacement)
- }
-
- Json.parse(body)
- }
-
- private def buildRequestInner(request: Request[AnyContent], bucket: Bucket, uuid: String): Future[Result] = {
- if (bucket.isEmpty) Future.successful(Ok(Json.obj("isEmpty" -> true)).withHeaders(impressionKey -> bucket.impressionId))
- else {
- val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid)
- val url = new URL(bucket.apiPath)
- val path = url.getPath()
-
- // dummy log for sampling
- val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
- logger.info(experimentLog)
-
- val response = path match {
- case "/graphs/getEdges" => controllers.QueryController.getEdgesInner(jsonBody)
- case "/graphs/getEdges/grouped" => controllers.QueryController.getEdgesWithGroupingInner(jsonBody)
- case "/graphs/getEdgesExcluded" => controllers.QueryController.getEdgesExcludedInner(jsonBody)
- case "/graphs/getEdgesExcluded/grouped" => controllers.QueryController.getEdgesExcludedWithGroupingInner(jsonBody)
- case "/graphs/checkEdges" => controllers.QueryController.checkEdgesInner(jsonBody)
- case "/graphs/getEdgesGrouped" => controllers.QueryController.getEdgesGroupedInner(jsonBody)
- case "/graphs/getEdgesGroupedExcluded" => controllers.QueryController.getEdgesGroupedExcludedInner(jsonBody)
- case "/graphs/getEdgesGroupedExcludedFormatted" => controllers.QueryController.getEdgesGroupedExcludedFormattedInner(jsonBody)
- }
- response.map { r => r.withHeaders(impressionKey -> bucket.impressionId) }
- }
- }
-
- private def toSimpleMap(map: Map[String, Seq[String]]): Map[String, String] = {
- for {
- (k, vs) <- map
- headVal <- vs.headOption
- } yield {
- k -> headVal
- }
- }
-
- private def buildRequest(request: Request[AnyContent], bucket: Bucket, uuid: String): Future[Result] = {
- val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid)
-
- val url = bucket.apiPath
- val headers = request.headers.toSimpleMap.toSeq
- val verb = bucket.httpVerb.toUpperCase
- val qs = toSimpleMap(request.queryString).toSeq
-
- val ws = WS.url(url)
- .withMethod(verb)
- .withBody(jsonBody)
- .withHeaders(headers: _*)
- .withQueryString(qs: _*)
+ val body = request.body.asJson.get
+ val res = rest.doPost(request.uri, body)
- ws.stream().map {
- case (proxyResponse, proxyBody) =>
- Result(ResponseHeader(proxyResponse.status, proxyResponse.headers.mapValues(_.toList.head)), proxyBody).withHeaders(impressionKey -> bucket.impressionId)
- }
+ res.body.map { case js =>
+ val headers = res.headers :+ ("result_size" -> rest.calcSize(js).toString)
+ jsonResponse(js, headers: _*)
+ } recoverWith ApplicationController.requestFallback(body)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_play/app/controllers/QueryController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/QueryController.scala b/s2rest_play/app/controllers/QueryController.scala
index 480c142..e836259 100644
--- a/s2rest_play/app/controllers/QueryController.scala
+++ b/s2rest_play/app/controllers/QueryController.scala
@@ -1,311 +1,48 @@
package controllers
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.types.{LabelWithDirection, VertexId}
-import com.kakao.s2graph.core.utils.logger
-import config.Config
-import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
-import play.api.mvc.{Action, Controller, Result}
+import com.kakao.s2graph.core.rest.RestHandler
+import play.api.libs.json.{JsValue, Json}
+import play.api.mvc.{Controller, Request}
-import scala.concurrent._
import scala.language.postfixOps
-import scala.util.Try
-object QueryController extends Controller {
+object QueryController extends Controller with JSONParser {
import ApplicationController._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
- private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph
- private val requestParser = com.kakao.s2graph.rest.Global.s2parser
+ private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest
- private def badQueryExceptionResults(ex: Exception) = Future.successful(BadRequest(Json.obj("message" -> ex.getMessage)).as(applicationJsonHeader))
+ def delegate(request: Request[JsValue]) =
+ rest.doPost(request.uri, request.body).body.map { js =>
+ jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+ } recoverWith ApplicationController.requestFallback(request.body)
- private def errorResults = Future.successful(Ok(PostProcess.timeoutResults).as(applicationJsonHeader))
+ def getEdges() = withHeaderAsync(jsonParser)(delegate)
- def getEdges() = withHeaderAsync(jsonParser) { request =>
- getEdgesInner(request.body)
- }
+ def getEdgesWithGrouping() = withHeaderAsync(jsonParser)(delegate)
- def getEdgesExcluded = withHeaderAsync(jsonParser) { request =>
- getEdgesExcludedInner(request.body)
- }
+ def getEdgesExcluded() = withHeaderAsync(jsonParser)(delegate)
- private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
- val filterOutQueryResultsLs = q.filterOutQuery match {
- case Some(filterOutQuery) => s2.getEdges(filterOutQuery)
- case None => Future.successful(Seq.empty)
- }
-
- for {
- queryResultsLs <- s2.getEdges(q)
- filterOutResultsLs <- filterOutQueryResultsLs
- } yield {
- val json = post(queryResultsLs, filterOutResultsLs)
- json
- }
- }
-
- private def calcSize(js: JsValue): Int = js match {
- case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
- case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum
- case _ => 0
- }
-
- private def getEdgesAsync(jsonQuery: JsValue)
- (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[Result] = {
- if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
- val fetch = eachQuery(post) _
-// logger.info(jsonQuery)
-
- Try {
- val future = jsonQuery match {
- case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
- case obj@JsObject(_) => fetch(requestParser.toQuery(obj))
- case _ => throw BadQueryException("Cannot support")
- }
-
- future map { json => jsonResponse(json, "result_size" -> calcSize(json).toString) }
-
- } recover {
- case e: BadQueryException =>
- logger.error(s"$jsonQuery, $e", e)
- badQueryExceptionResults(e)
- case e: Exception =>
- logger.error(s"$jsonQuery, $e", e)
- errorResults
- } get
- }
-
- @deprecated(message = "deprecated", since = "0.2")
- private def getEdgesExcludedAsync(jsonQuery: JsValue)
- (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[Result] = {
-
- if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
-
- Try {
- val q = requestParser.toQuery(jsonQuery)
- val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
-
- val fetchFuture = s2.getEdges(q)
- val excludeFuture = s2.getEdges(filterOutQuery)
-
- for {
- queryResultLs <- fetchFuture
- exclude <- excludeFuture
- } yield {
- val json = post(queryResultLs, exclude)
- jsonResponse(json, "result_size" -> calcSize(json).toString)
- }
- } recover {
- case e: BadQueryException =>
- logger.error(s"$jsonQuery, $e", e)
- badQueryExceptionResults(e)
- case e: Exception =>
- logger.error(s"$jsonQuery, $e", e)
- errorResults
- } get
- }
-
- def getEdgesInner(jsonQuery: JsValue) = {
- getEdgesAsync(jsonQuery)(PostProcess.toSimpleVertexArrJson)
- }
-
- def getEdgesExcludedInner(jsValue: JsValue) = {
- getEdgesExcludedAsync(jsValue)(PostProcess.toSimpleVertexArrJson)
- }
-
- def getEdgesWithGrouping() = withHeaderAsync(jsonParser) { request =>
- getEdgesWithGroupingInner(request.body)
- }
-
- def getEdgesWithGroupingInner(jsonQuery: JsValue) = {
- getEdgesAsync(jsonQuery)(PostProcess.summarizeWithListFormatted)
- }
-
- def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonParser) { request =>
- getEdgesExcludedWithGroupingInner(request.body)
- }
-
- def getEdgesExcludedWithGroupingInner(jsonQuery: JsValue) = {
- getEdgesExcludedAsync(jsonQuery)(PostProcess.summarizeWithListExcludeFormatted)
- }
+ def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonParser)(delegate)
- def getEdgesGroupedInner(jsonQuery: JsValue) = {
- getEdgesAsync(jsonQuery)(PostProcess.summarizeWithList)
- }
+ def checkEdges() = withHeaderAsync(jsonParser)(delegate)
- @deprecated(message = "deprecated", since = "0.2")
- def getEdgesGrouped() = withHeaderAsync(jsonParser) { request =>
- getEdgesGroupedInner(request.body)
- }
+ def getEdgesGrouped() = withHeaderAsync(jsonParser)(delegate)
- @deprecated(message = "deprecated", since = "0.2")
- def getEdgesGroupedExcluded() = withHeaderAsync(jsonParser) { request =>
- getEdgesGroupedExcludedInner(request.body)
- }
+ def getEdgesGroupedExcluded() = withHeaderAsync(jsonParser)(delegate)
- @deprecated(message = "deprecated", since = "0.2")
- def getEdgesGroupedExcludedInner(jsonQuery: JsValue): Future[Result] = {
- if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
+ def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonParser)(delegate)
- Try {
- val q = requestParser.toQuery(jsonQuery)
- val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
-
- val fetchFuture = s2.getEdges(q)
- val excludeFuture = s2.getEdges(filterOutQuery)
-
- for {
- queryResultLs <- fetchFuture
- exclude <- excludeFuture
- } yield {
- val json = PostProcess.summarizeWithListExclude(queryResultLs, exclude)
- jsonResponse(json, "result_size" -> calcSize(json).toString)
- }
- } recover {
- case e: BadQueryException =>
- logger.error(s"$jsonQuery, $e", e)
- badQueryExceptionResults(e)
- case e: Exception =>
- logger.error(s"$jsonQuery, $e", e)
- errorResults
- } get
- }
-
- @deprecated(message = "deprecated", since = "0.2")
- def getEdgesGroupedExcludedFormatted = withHeaderAsync(jsonParser) { request =>
- getEdgesGroupedExcludedFormattedInner(request.body)
- }
-
- @deprecated(message = "deprecated", since = "0.2")
- def getEdgesGroupedExcludedFormattedInner(jsonQuery: JsValue): Future[Result] = {
- if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
-
- Try {
- val q = requestParser.toQuery(jsonQuery)
- val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
-
- val fetchFuture = s2.getEdges(q)
- val excludeFuture = s2.getEdges(filterOutQuery)
-
- for {
- queryResultLs <- fetchFuture
- exclude <- excludeFuture
- } yield {
- val json = PostProcess.summarizeWithListExcludeFormatted(queryResultLs, exclude)
- jsonResponse(json, "result_size" -> calcSize(json).toString)
- }
- } recover {
- case e: BadQueryException =>
- logger.error(s"$jsonQuery, $e", e)
- badQueryExceptionResults(e)
- case e: Exception =>
- logger.error(s"$jsonQuery, $e", e)
- errorResults
- } get
- }
-
- def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) = Action.async { request =>
- if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized)
- val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
- checkEdgesInner(params)
- }
-
- /**
- * Vertex
- */
-
- def checkEdgesInner(jsValue: JsValue) = {
- try {
- val params = jsValue.as[List[JsValue]]
- var isReverted = false
- val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]()
- val quads = for {
- param <- params
- labelName <- (param \ "label").asOpt[String]
- direction <- GraphUtil.toDir((param \ "direction").asOpt[String].getOrElse("out"))
- label <- Label.findByName(labelName)
- srcId <- requestParser.jsValueToInnerVal((param \ "from").as[JsValue], label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion)
- tgtId <- requestParser.jsValueToInnerVal((param \ "to").as[JsValue], label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion)
- } yield {
- val labelWithDir = LabelWithDirection(label.id.get, direction)
- labelWithDirs += labelWithDir
- val (src, tgt, dir) = if (direction == 1) {
- isReverted = true
- (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)),
- Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), 0)
- } else {
- (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)),
- Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), 0)
- }
-
- // logger.debug(s"SrcVertex: $src")
- // logger.debug(s"TgtVertex: $tgt")
- // logger.debug(s"direction: $dir")
- (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir)))
- }
-
- s2.checkEdges(quads).map { case queryRequestWithResultLs =>
- val edgeJsons = for {
- queryRequestWithResult <- queryRequestWithResultLs
- (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
- edgeWithScore <- queryResult.edgeWithScoreLs
- (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
- convertedEdge = if (isReverted) edge.duplicateEdge else edge
- edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
- } yield Json.toJson(edgeJson)
-
- val json = Json.toJson(edgeJsons)
- jsonResponse(json, "result_size" -> edgeJsons.size.toString)
- }
- } catch {
- case e: Exception =>
- logger.error(s"$jsValue, $e", e)
- errorResults
+ def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) =
+ withHeaderAsync(jsonParser) { request =>
+ val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
+ rest.checkEdges(params).body.map { js =>
+ jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+ } recoverWith ApplicationController.requestFallback(request.body)
}
- }
-
- def checkEdges() = withHeaderAsync(jsonParser) { request =>
- if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized)
-
- checkEdgesInner(request.body)
- }
-
- def getVertices() = withHeaderAsync(jsonParser) { request =>
- getVerticesInner(request.body)
- }
-
- def getVerticesInner(jsValue: JsValue) = {
- if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
-
- val jsonQuery = jsValue
- val ts = System.currentTimeMillis()
- val props = "{}"
-
- Try {
- val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
- val serviceName = (js \ "serviceName").as[String]
- val columnName = (js \ "columnName").as[String]
- for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
- Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
- }
- }
- s2.getVertices(vertices) map { vertices =>
- val json = PostProcess.verticesToJson(vertices)
- jsonResponse(json, "result_size" -> calcSize(json).toString)
- }
- } recover {
- case e: play.api.libs.json.JsResultException =>
- logger.error(s"$jsonQuery, $e", e)
- badQueryExceptionResults(e)
- case e: Exception =>
- logger.error(s"$jsonQuery, $e", e)
- errorResults
- } get
- }
+ def getVertices() = withHeaderAsync(jsonParser)(delegate)
}