You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by da...@apache.org on 2016/01/18 11:10:36 UTC

incubator-s2graph git commit: [S2GRAPH-9] Provide rest server using netty

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 66be5c8c3 -> 2b63878cc


[S2GRAPH-9] Provide rest server using netty

    Move common codes for rest into s2core.
    Move test cases into s2core.
    Add Netty server.

JIRA:
  [S2GRAPH-9] https://issues.apache.org/jira/browse/S2GRAPH-9

Pull Request:
  Closes #8


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2b63878c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2b63878c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2b63878c

Branch: refs/heads/master
Commit: 2b63878cc3850b8308fb123c824870066efbef09
Parents: 66be5c8
Author: daewon <da...@apache.org>
Authored: Mon Jan 18 17:37:13 2016 +0900
Committer: daewon <da...@apache.org>
Committed: Mon Jan 18 17:37:13 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 build.sbt                                       |   4 +
 s2core/src/main/resources/reference.conf        |  22 +-
 .../com/kakao/s2graph/core/PostProcess.scala    |   1 -
 .../kakao/s2graph/core/mysqls/Experiment.scala  |   5 +-
 .../kakao/s2graph/core/rest/RequestParser.scala |   2 +-
 .../kakao/s2graph/core/rest/RestCaller.scala    | 183 -----------
 .../kakao/s2graph/core/rest/RestHandler.scala   | 199 ++++++++++++
 .../kakao/s2graph/core/utils/Extentions.scala   |   7 +
 .../s2graph/core/parsers/WhereParserTest.scala  |   2 +
 s2rest_netty/src/main/scala/Server.scala        | 200 ++++++++++++
 s2rest_play/app/Bootstrap.scala                 |   4 +-
 .../app/controllers/ApplicationController.scala |  17 +
 .../app/controllers/ExperimentController.scala  | 107 +------
 .../app/controllers/QueryController.scala       | 311 ++-----------------
 15 files changed, 489 insertions(+), 577 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 188d194..f9c6964 100644
--- a/CHANGES
+++ b/CHANGES
@@ -18,4 +18,6 @@ Release 0.12.1 - unreleased
 
   SUB TASKS
 
+    S2GRAPH-9: Provide rest server using netty. (Committed by daewon).
+
     S2GRAPH-7: Abstract common codes for rest project into s2core. (Committed by daewon).

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 30e5b64..5efa284 100755
--- a/build.sbt
+++ b/build.sbt
@@ -26,6 +26,10 @@ lazy val s2rest_play = project.enablePlugins(PlayScala)
   .settings(commonSettings: _*)
   .settings(testOptions in Test += Tests.Argument("sequential"))
 
+lazy val s2rest_netty = project
+  .dependsOn(s2core)
+  .settings(commonSettings: _*)
+
 lazy val s2core = project.settings(commonSettings: _*)
 
 lazy val spark = project.settings(commonSettings: _*)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/reference.conf b/s2core/src/main/resources/reference.conf
index b527f7f..cef0284 100644
--- a/s2core/src/main/resources/reference.conf
+++ b/s2core/src/main/resources/reference.conf
@@ -2,8 +2,28 @@
 phase=dev
 host=localhost
 
+# Hbase
+hbase.table.compression.algorithm="gz"
 hbase.zookeeper.quorum=${host}
 
+# Asynchbase
+hbase.client.retries.number=100
+hbase.rpcs.buffered_flush_interval=100
+hbase.rpc.timeout=0
+
+# local retry number
+max.retry.number=100
+max.back.off=50
+
+# Future cache.
+future.cache.max.size=100000
+future.cache.expire.after.write=10000
+future.cache.expire.after.access=5000
+
+# Local Cache
+cache.ttl.seconds=60
+cache.max.size=100000
+
 # DB
 s2graph.models.table.name="models-dev"
 db.default.driver="com.mysql.jdbc.Driver"
@@ -11,8 +31,6 @@ db.default.url="jdbc:mysql://"${host}":3306/graph_dev"
 db.default.user="graph"
 db.default.password="graph"
 
-cache.max.size=100000
-
 akka {
   loggers = ["akka.event.slf4j.Slf4jLogger"]
   loglevel = "DEBUG"

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
index 0a26d26..56d8591 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
@@ -323,7 +323,6 @@ object PostProcess extends JSONParser {
       to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType)
     } yield {
       val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props"
-
       val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam)
       val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
index 93cc374..d484914 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
@@ -5,10 +5,9 @@ import scalikejdbc._
 
 import scala.util.Random
 
-/**
- * Created by shon on 8/5/15.
- */
 object Experiment extends Model[Experiment] {
+  val impressionKey = "S2-Impression-Id"
+
   def apply(rs: WrappedResultSet): Experiment = {
     Experiment(rs.intOpt("id"),
       rs.int("service_id"),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
index 98cc68b..78fc375 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
@@ -6,6 +6,7 @@ import com.kakao.s2graph.core.mysqls._
 import com.kakao.s2graph.core.parsers.WhereParser
 import com.kakao.s2graph.core.types._
 import com.typesafe.config.Config
+
 import play.api.libs.json._
 
 import scala.util.{Failure, Success, Try}
@@ -482,7 +483,6 @@ class RequestParser(config: Config) extends JSONParser {
       }
       (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir)))
     }
-
     (quads, isReverted)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala
deleted file mode 100644
index bef1dec..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-package com.kakao.s2graph.core.rest
-
-import java.net.URL
-
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service}
-import com.kakao.s2graph.core.utils.logger
-import play.api.libs.json._
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.Try
-
-/**
-  * Public API only return Future.successful or Future.failed
-  * Don't throw exception
-  */
-class RestCaller(graph: Graph)(implicit ec: ExecutionContext) {
-  val s2Parser = new RequestParser(graph.config)
-
-  /**
-    * Public APIS
-    */
-  def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String): Future[(JsValue, String)] = {
-    try {
-      val bucketOpt = for {
-        service <- Service.findByAccessToken(accessToken)
-        experiment <- Experiment.findBy(service.id.get, experimentName)
-        bucket <- experiment.findBucket(uuid)
-      } yield bucket
-
-      val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found"))
-      if (bucket.isGraphQuery) buildRequestInner(contentsBody, bucket, uuid).map(_ -> bucket.impressionId)
-      else throw new RuntimeException("not supported yet")
-    } catch {
-      case e: Exception => Future.failed(e)
-    }
-  }
-
-  def uriMatch(uri: String, jsQuery: JsValue): Future[JsValue] = {
-    try {
-      uri match {
-        case "/graphs/getEdges" => getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)
-        case "/graphs/getEdges/grouped" => getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted)
-        case "/graphs/getEdgesExcluded" => getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)
-        case "/graphs/getEdgesExcluded/grouped" => getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)
-        case "/graphs/checkEdges" => checkEdges(jsQuery)
-        case "/graphs/getEdgesGrouped" => getEdgesAsync(jsQuery)(PostProcess.summarizeWithList)
-        case "/graphs/getEdgesGroupedExcluded" => getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude)
-        case "/graphs/getEdgesGroupedExcludedFormatted" => getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)
-        case "/graphs/getVertices" => getVertices(jsQuery)
-        case _ => throw new RuntimeException("route is not found")
-      }
-    } catch {
-      case e: Exception => Future.failed(e)
-    }
-  }
-
-  def checkEdges(jsValue: JsValue): Future[JsValue] = {
-    try {
-      val (quads, isReverted) = s2Parser.toCheckEdgeParam(jsValue)
-
-      graph.checkEdges(quads).map { case queryRequestWithResultLs =>
-        val edgeJsons = for {
-          queryRequestWithResult <- queryRequestWithResultLs
-          (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-          edgeWithScore <- queryResult.edgeWithScoreLs
-          (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-          convertedEdge = if (isReverted) edge.duplicateEdge else edge
-          edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
-        } yield Json.toJson(edgeJson)
-
-        Json.toJson(edgeJsons)
-      }
-    } catch {
-      case e: Exception => Future.failed(e)
-    }
-  }
-
-  /**
-    * Private APIS
-    */
-  private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
-    val filterOutQueryResultsLs = q.filterOutQuery match {
-      case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-      case None => Future.successful(Seq.empty)
-    }
-
-    for {
-      queryResultsLs <- graph.getEdges(q)
-      filterOutResultsLs <- filterOutQueryResultsLs
-    } yield {
-      val json = post(queryResultsLs, filterOutResultsLs)
-      json
-    }
-  }
-
-  private def getEdgesAsync(jsonQuery: JsValue)
-                           (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-
-    val fetch = eachQuery(post) _
-    jsonQuery match {
-      case JsArray(arr) => Future.traverse(arr.map(s2Parser.toQuery(_)))(fetch).map(JsArray)
-      case obj@JsObject(_) => fetch(s2Parser.toQuery(obj))
-      case _ => throw BadQueryException("Cannot support")
-    }
-  }
-
-  private def getEdgesExcludedAsync(jsonQuery: JsValue)
-                                   (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-    val q = s2Parser.toQuery(jsonQuery)
-    val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
-
-    val fetchFuture = graph.getEdges(q)
-    val excludeFuture = graph.getEdges(filterOutQuery)
-
-    for {
-      queryResultLs <- fetchFuture
-      exclude <- excludeFuture
-    } yield {
-      post(queryResultLs, exclude)
-    }
-  }
-
-  private def getVertices(jsValue: JsValue) = {
-    val jsonQuery = jsValue
-    val ts = System.currentTimeMillis()
-    val props = "{}"
-
-    val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
-      val serviceName = (js \ "serviceName").as[String]
-      val columnName = (js \ "columnName").as[String]
-      for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
-        Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
-      }
-    }
-
-    graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) }
-  }
-
-
-  private def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = {
-    var body = bucket.requestBody.replace("#uuid", uuid)
-    for {
-      requestKeyJson <- requestKeyJsonOpt
-      jsObj <- requestKeyJson.asOpt[JsObject]
-      (key, value) <- jsObj.fieldSet
-    } {
-      val replacement = value match {
-        case JsString(s) => s
-        case _ => value.toString
-      }
-      body = body.replace(key, replacement)
-    }
-
-    Try(Json.parse(body)).recover {
-      case e: Exception =>
-        throw new BadQueryException(s"wrong or missing template parameter: ${e.getMessage.takeWhile(_ != '\n')}")
-    } get
-  }
-
-  private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: String): Future[JsValue] = {
-    if (bucket.isEmpty) Future.successful(PostProcess.emptyResults)
-    else {
-      val jsonBody = makeRequestJson(Option(contentsBody), bucket, uuid)
-      val url = new URL(bucket.apiPath)
-      val path = url.getPath()
-
-      // dummy log for sampling
-      val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
-
-      logger.info(experimentLog)
-
-      uriMatch(path, jsonBody)
-    }
-  }
-
-  def calcSize(js: JsValue): Int = js match {
-    case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
-    case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum
-    case _ => 0
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
new file mode 100644
index 0000000..9647314
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
@@ -0,0 +1,199 @@
+package com.kakao.s2graph.core.rest
+
+import java.net.URL
+
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import com.kakao.s2graph.core.utils.logger
+import play.api.libs.json._
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
+
+
+object RestHandler {
+  case class HandlerResult(body: Future[JsValue], headers: (String, String)*)
+}
+
+/**
+  * Public API, only return Future.successful or Future.failed
+  * Don't throw exception
+  */
+class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
+
+  import RestHandler._
+
+  val s2Parser = new RequestParser(graph.config)
+
+  /**
+    * Public APIS
+    */
+  def doPost(uri: String, jsQuery: JsValue): HandlerResult = {
+    try {
+      uri match {
+        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+        case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
+        case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
+        case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+        case "/graphs/checkEdges" => checkEdges(jsQuery)
+        case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
+        case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
+        case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+        case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
+        case uri if uri.startsWith("/graphs/experiment") =>
+          val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3)
+          experiment(jsQuery, accessToken, experimentName, uuid)
+        case _ => throw new RuntimeException("route is not found")
+      }
+    } catch {
+      case e: Exception => HandlerResult(Future.failed(e))
+    }
+  }
+
+  // TODO: Refactor to doGet
+  def checkEdges(jsValue: JsValue): HandlerResult = {
+    try {
+      val (quads, isReverted) = s2Parser.toCheckEdgeParam(jsValue)
+
+      HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs =>
+        val edgeJsons = for {
+          queryRequestWithResult <- queryRequestWithResultLs
+          (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+          edgeWithScore <- queryResult.edgeWithScoreLs
+          (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+          convertedEdge = if (isReverted) edge.duplicateEdge else edge
+          edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
+        } yield Json.toJson(edgeJson)
+
+        Json.toJson(edgeJsons)
+      })
+    } catch {
+      case e: Exception => HandlerResult(Future.failed(e))
+    }
+  }
+
+
+  /**
+    * Private APIS
+    */
+  private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String): HandlerResult = {
+    try {
+      val bucketOpt = for {
+        service <- Service.findByAccessToken(accessToken)
+        experiment <- Experiment.findBy(service.id.get, experimentName)
+        bucket <- experiment.findBucket(uuid)
+      } yield bucket
+
+      val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found"))
+      if (bucket.isGraphQuery) {
+        val ret = buildRequestInner(contentsBody, bucket, uuid)
+        HandlerResult(ret.body, Experiment.impressionKey -> bucket.impressionId)
+      }
+      else throw new RuntimeException("not supported yet")
+    } catch {
+      case e: Exception => HandlerResult(Future.failed(e))
+    }
+  }
+
+  private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: String): HandlerResult = {
+    if (bucket.isEmpty) HandlerResult(Future.successful(PostProcess.emptyResults))
+    else {
+      val jsonBody = makeRequestJson(Option(contentsBody), bucket, uuid)
+      val url = new URL(bucket.apiPath)
+      val path = url.getPath()
+
+      // dummy log for sampling
+      val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
+
+      logger.info(experimentLog)
+
+      doPost(path, jsonBody)
+    }
+  }
+
+  private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
+    val filterOutQueryResultsLs = q.filterOutQuery match {
+      case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
+      case None => Future.successful(Seq.empty)
+    }
+
+    for {
+      queryResultsLs <- graph.getEdges(q)
+      filterOutResultsLs <- filterOutQueryResultsLs
+    } yield {
+      val json = post(queryResultsLs, filterOutResultsLs)
+      json
+    }
+  }
+
+  private def getEdgesAsync(jsonQuery: JsValue)
+                           (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
+
+    val fetch = eachQuery(post) _
+    jsonQuery match {
+      case JsArray(arr) => Future.traverse(arr.map(s2Parser.toQuery(_)))(fetch).map(JsArray)
+      case obj@JsObject(_) => fetch(s2Parser.toQuery(obj))
+      case _ => throw BadQueryException("Cannot support")
+    }
+  }
+
+  private def getEdgesExcludedAsync(jsonQuery: JsValue)
+                                   (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
+    val q = s2Parser.toQuery(jsonQuery)
+    val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
+
+    val fetchFuture = graph.getEdges(q)
+    val excludeFuture = graph.getEdges(filterOutQuery)
+
+    for {
+      queryResultLs <- fetchFuture
+      exclude <- excludeFuture
+    } yield {
+      post(queryResultLs, exclude)
+    }
+  }
+
+  private def getVertices(jsValue: JsValue) = {
+    val jsonQuery = jsValue
+    val ts = System.currentTimeMillis()
+    val props = "{}"
+
+    val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
+      val serviceName = (js \ "serviceName").as[String]
+      val columnName = (js \ "columnName").as[String]
+      for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
+        Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
+      }
+    }
+
+    graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) }
+  }
+
+
+  private def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = {
+    var body = bucket.requestBody.replace("#uuid", uuid)
+    for {
+      requestKeyJson <- requestKeyJsonOpt
+      jsObj <- requestKeyJson.asOpt[JsObject]
+      (key, value) <- jsObj.fieldSet
+    } {
+      val replacement = value match {
+        case JsString(s) => s
+        case _ => value.toString
+      }
+      body = body.replace(key, replacement)
+    }
+
+    Try(Json.parse(body)).recover {
+      case e: Exception =>
+        throw new BadQueryException(s"wrong or missing template parameter: ${e.getMessage.takeWhile(_ != '\n')}")
+    } get
+  }
+
+  def calcSize(js: JsValue): Int = js match {
+    case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
+    case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum
+    case _ => 0
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
index 3812c2c..4858f60 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
@@ -1,6 +1,8 @@
 package com.kakao.s2graph.core.utils
 
 import com.stumbleupon.async.{Callback, Deferred}
+import com.typesafe.config.Config
+
 import scala.concurrent.{ExecutionContext, Future, Promise}
 
 object Extensions {
@@ -64,4 +66,9 @@ object Extensions {
 
   }
 
+  implicit class ConfigOps(config: Config) {
+    def getBooleanWithFallback(key: String, defaultValue: Boolean): Boolean =
+      if (config.hasPath(key)) config.getBoolean(key) else defaultValue
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
index 659983c..de19a75 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
@@ -7,6 +7,8 @@ import org.scalatest.{FunSuite, Matchers}
 import play.api.libs.json.Json
 
 class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels {
+  initTests()
+
   // dummy data for dummy edge
   initTests()
   

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_netty/src/main/scala/Server.scala
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/scala/Server.scala b/s2rest_netty/src/main/scala/Server.scala
new file mode 100644
index 0000000..7290b8a
--- /dev/null
+++ b/s2rest_netty/src/main/scala/Server.scala
@@ -0,0 +1,200 @@
+package com.kakao.s2graph.rest.netty
+
+import java.util.concurrent.Executors
+
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.rest.RestHandler.HandlerResult
+import com.kakao.s2graph.core.rest._
+import com.kakao.s2graph.core.utils.Extensions._
+import com.kakao.s2graph.core.utils.logger
+import com.typesafe.config.ConfigFactory
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.{ByteBuf, Unpooled}
+import io.netty.channel._
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.socket.SocketChannel
+import io.netty.channel.socket.nio.NioServerSocketChannel
+import io.netty.handler.codec.http.HttpHeaders._
+import io.netty.handler.codec.http._
+import io.netty.handler.logging.{LogLevel, LoggingHandler}
+import io.netty.util.CharsetUtil
+import play.api.libs.json._
+
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+import scala.io.Source
+import scala.util.{Failure, Success, Try}
+
+class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] with JSONParser {
+  val ApplicationJson = "application/json"
+
+  val Ok = HttpResponseStatus.OK
+  val CloseOpt = Option(ChannelFutureListener.CLOSE)
+  val BadRequest = HttpResponseStatus.BAD_REQUEST
+  val BadGateway = HttpResponseStatus.BAD_GATEWAY
+  val NotFound = HttpResponseStatus.NOT_FOUND
+  val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR
+
+  def badRoute(ctx: ChannelHandlerContext) =
+    simpleResponse(ctx, BadGateway, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+
+  def simpleResponse(ctx: ChannelHandlerContext,
+                     httpResponseStatus: HttpResponseStatus,
+                     byteBufOpt: Option[ByteBuf] = None,
+                     headers: Seq[(String, String)] = Nil,
+                     channelFutureListenerOpt: Option[ChannelFutureListener] = None): Unit = {
+
+    val res: FullHttpResponse = byteBufOpt match {
+      case None => new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus)
+      case Some(byteBuf) =>
+        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, byteBuf)
+    }
+
+    headers.foreach { case (k, v) => res.headers().set(k, v) }
+    val channelFuture = ctx.writeAndFlush(res)
+
+    channelFutureListenerOpt match {
+      case None =>
+      case Some(listener) => channelFuture.addListener(listener)
+    }
+  }
+
+  def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: JsValue, result: HandlerResult, startedAt: Long) = {
+    var closeOpt = CloseOpt
+    var headers = mutable.ArrayBuilder.make[(String, String)]
+
+    headers += (Names.CONTENT_TYPE -> ApplicationJson)
+    result.headers.foreach(headers += _)
+
+    if (HttpHeaders.isKeepAlive(req)) {
+      headers += (Names.CONNECTION -> HttpHeaders.Values.KEEP_ALIVE)
+      closeOpt = None
+    }
+
+    result.body onComplete {
+      case Success(json) =>
+        val duration = System.currentTimeMillis() - startedAt
+
+        val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 ${s2rest.calcSize(json)} ${requestBody}"
+        logger.info(log)
+
+        val buf: ByteBuf = Unpooled.copiedBuffer(json.toString, CharsetUtil.UTF_8)
+
+        headers += (Names.CONTENT_LENGTH -> buf.readableBytes().toString)
+
+        simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result())
+      case Failure(ex) => ex match {
+        case e: BadQueryException =>
+          logger.error(s"{$requestBody}, ${e.getMessage}", e)
+          val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, CharsetUtil.UTF_8)
+          simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result())
+        case e: Exception =>
+          logger.error(s"${requestBody}, ${e.getMessage}", e)
+          val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8)
+          simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result())
+      }
+    }
+  }
+
+  override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = {
+    val uri = req.getUri
+    val startedAt = System.currentTimeMillis()
+
+    req.getMethod match {
+      case HttpMethod.GET =>
+        uri match {
+          case "/health_check.html" =>
+            if (NettyServer.isHealthy) {
+              val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8)
+              simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt)
+            } else {
+              simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt)
+            }
+
+          case s if s.startsWith("/graphs/getEdge/") =>
+            // src, tgt, label, dir
+            val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4)
+            val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
+            val result = s2rest.checkEdges(params)
+            toResponse(ctx, req, params, result, startedAt)
+          case _ => badRoute(ctx)
+        }
+
+      case HttpMethod.PUT =>
+        if (uri.startsWith("/health_check/")) {
+          val newHealthCheck = uri.split("/").last.toBoolean
+          NettyServer.isHealthy = newHealthCheck
+          val newHealthCheckMsg = Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8)
+          simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt)
+        } else badRoute(ctx)
+
+      case HttpMethod.POST =>
+        val jsonString = req.content.toString(CharsetUtil.UTF_8)
+        val jsQuery = Json.parse(jsonString)
+
+        val result = s2rest.doPost(uri, jsQuery)
+        toResponse(ctx, req, jsQuery, result, startedAt)
+
+      case _ =>
+        simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+    }
+  }
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
+    cause.printStackTrace()
+    logger.error(s"exception on query.", cause)
+    simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt)
+  }
+}
+
+// Simple http server
+object NettyServer extends App {
+  /** should be same with Boostrap.onStart on play */
+
+  val numOfThread = Runtime.getRuntime.availableProcessors()
+  val threadPool = Executors.newFixedThreadPool(numOfThread)
+  val ec = ExecutionContext.fromExecutor(threadPool)
+
+  val config = ConfigFactory.load()
+  val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get
+
+  // init s2graph with config
+  val s2graph = new Graph(config)(ec)
+  val rest = new RestHandler(s2graph)(ec)
+
+  val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get
+  var isHealthy = config.getBooleanWithFallback("app.health.on", true)
+
+  logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}")
+
+  // Configure the server.
+  val bossGroup: EventLoopGroup = new NioEventLoopGroup(1)
+  val workerGroup: EventLoopGroup = new NioEventLoopGroup()
+
+  try {
+    val b: ServerBootstrap = new ServerBootstrap()
+    b.option(ChannelOption.SO_BACKLOG, Int.box(2048))
+
+    b.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel])
+      .handler(new LoggingHandler(LogLevel.INFO))
+      .childHandler(new ChannelInitializer[SocketChannel] {
+        override def initChannel(ch: SocketChannel) {
+          val p = ch.pipeline()
+          p.addLast(new HttpServerCodec())
+          p.addLast(new HttpObjectAggregator(65536))
+          p.addLast(new S2RestHandler(rest)(ec))
+        }
+      })
+
+    logger.info(s"Listening for HTTP on /0.0.0.0:$port")
+    val ch: Channel = b.bind(port).sync().channel()
+    ch.closeFuture().sync()
+
+  } finally {
+    bossGroup.shutdownGracefully()
+    workerGroup.shutdownGracefully()
+    s2graph.shutdown()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_play/app/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala
index a1acb10..8005f79 100644
--- a/s2rest_play/app/Bootstrap.scala
+++ b/s2rest_play/app/Bootstrap.scala
@@ -3,7 +3,7 @@ package com.kakao.s2graph.rest
 import java.util.concurrent.Executors
 
 import actors.QueueActor
-import com.kakao.s2graph.core.rest.RequestParser
+import com.kakao.s2graph.core.rest._
 import com.kakao.s2graph.core.utils.logger
 import com.kakao.s2graph.core.{Management, ExceptionHandler, Graph}
 import config.Config
@@ -20,6 +20,7 @@ object Global extends WithFilters(new GzipFilter()) {
   var s2graph: Graph = _
   var storageManagement: Management = _
   var s2parser: RequestParser = _
+  var s2rest: RestHandler = _
 
   // Application entry point
   override def onStart(app: Application) {
@@ -35,6 +36,7 @@ object Global extends WithFilters(new GzipFilter()) {
     s2graph = new Graph(config)(ec)
     storageManagement = new Management(s2graph)
     s2parser = new RequestParser(s2graph.config) // merged config
+    s2rest = new RestHandler(s2graph)(ec)
 
     QueueActor.init(s2graph)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_play/app/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ApplicationController.scala b/s2rest_play/app/controllers/ApplicationController.scala
index fefe93e..d9384d9 100644
--- a/s2rest_play/app/controllers/ApplicationController.scala
+++ b/s2rest_play/app/controllers/ApplicationController.scala
@@ -1,5 +1,7 @@
 package controllers
 
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core.PostProcess
 import com.kakao.s2graph.core.utils.logger
 import play.api.libs.iteratee.Enumerator
 import play.api.libs.json.{JsString, JsValue}
@@ -15,6 +17,21 @@ object ApplicationController extends Controller {
 
   val jsonParser: BodyParser[JsValue] = controllers.s2parse.json
 
+  private def badQueryExceptionResults(ex: Exception) =
+    Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader))
+
+  private def errorResults =
+    Future.successful(Ok(PostProcess.emptyResults).as(applicationJsonHeader))
+
+  def requestFallback(body: JsValue): PartialFunction[Throwable, Future[Result]] = {
+    case e: BadQueryException =>
+      logger.error(s"{$body}, ${e.getMessage}", e)
+      badQueryExceptionResults(e)
+    case e: Exception =>
+      logger.error(s"${body}, ${e.getMessage}", e)
+      errorResults
+  }
+
   def updateHealthCheck(isHealthy: Boolean) = Action { request =>
     this.isHealthy = isHealthy
     Ok(this.isHealthy + "\n")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_play/app/controllers/ExperimentController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ExperimentController.scala b/s2rest_play/app/controllers/ExperimentController.scala
index 5ae2379..40f67d1 100644
--- a/s2rest_play/app/controllers/ExperimentController.scala
+++ b/s2rest_play/app/controllers/ExperimentController.scala
@@ -1,114 +1,23 @@
 package controllers
 
 
-import java.net.URL
-
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.rest.RequestParser
-import com.kakao.s2graph.core.utils.logger
-import play.api.Play.current
-import play.api.libs.json.{JsObject, JsString, JsValue, Json}
-import play.api.libs.ws.WS
+import com.kakao.s2graph.core.rest.RestHandler
 import play.api.mvc._
 
 import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
 
 object ExperimentController extends Controller {
-  val impressionKey = "S2-Impression-Id"
+  private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest
 
   import ApplicationController._
 
   def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(parse.anyContent) { request =>
-    val bucketOpt = for {
-      service <- Service.findByAccessToken(accessToken)
-      experiment <- Experiment.findBy(service.id.get, experimentName)
-      bucket <- experiment.findBucket(uuid)
-    } yield bucket
-
-    bucketOpt match {
-      case None => Future.successful(NotFound("bucket is not found."))
-      case Some(bucket) =>
-        try {
-          if (bucket.isGraphQuery) buildRequestInner(request, bucket, uuid)
-          else buildRequest(request, bucket, uuid)
-        } catch {
-          case e: Exception =>
-            logger.error(e.toString())
-            Future.successful(BadRequest(s"wrong or missing template parameter: ${e.getMessage}"))
-        }
-    }
-  }
-
-  def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = {
-    var body = bucket.requestBody.replace("#uuid", uuid)
-
-    for {
-      requestKeyJson <- requestKeyJsonOpt
-      jsObj <- requestKeyJson.asOpt[JsObject]
-      (key, value) <- jsObj.fieldSet
-    } {
-      val replacement = value match {
-        case JsString(s) => s
-        case _ => value.toString
-      }
-      body = body.replace(key, replacement)
-    }
-
-    Json.parse(body)
-  }
-
-  private def buildRequestInner(request: Request[AnyContent], bucket: Bucket, uuid: String): Future[Result] = {
-    if (bucket.isEmpty) Future.successful(Ok(Json.obj("isEmpty" -> true)).withHeaders(impressionKey -> bucket.impressionId))
-    else {
-      val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid)
-      val url = new URL(bucket.apiPath)
-      val path = url.getPath()
-
-      // dummy log for sampling
-      val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
-      logger.info(experimentLog)
-
-      val response = path match {
-        case "/graphs/getEdges" => controllers.QueryController.getEdgesInner(jsonBody)
-        case "/graphs/getEdges/grouped" => controllers.QueryController.getEdgesWithGroupingInner(jsonBody)
-        case "/graphs/getEdgesExcluded" => controllers.QueryController.getEdgesExcludedInner(jsonBody)
-        case "/graphs/getEdgesExcluded/grouped" => controllers.QueryController.getEdgesExcludedWithGroupingInner(jsonBody)
-        case "/graphs/checkEdges" => controllers.QueryController.checkEdgesInner(jsonBody)
-        case "/graphs/getEdgesGrouped" => controllers.QueryController.getEdgesGroupedInner(jsonBody)
-        case "/graphs/getEdgesGroupedExcluded" => controllers.QueryController.getEdgesGroupedExcludedInner(jsonBody)
-        case "/graphs/getEdgesGroupedExcludedFormatted" => controllers.QueryController.getEdgesGroupedExcludedFormattedInner(jsonBody)
-      }
-      response.map { r => r.withHeaders(impressionKey -> bucket.impressionId) }
-    }
-  }
-
-  private def toSimpleMap(map: Map[String, Seq[String]]): Map[String, String] = {
-    for {
-      (k, vs) <- map
-      headVal <- vs.headOption
-    } yield {
-      k -> headVal
-    }
-  }
-
-  private def buildRequest(request: Request[AnyContent], bucket: Bucket, uuid: String): Future[Result] = {
-    val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid)
-
-    val url = bucket.apiPath
-    val headers = request.headers.toSimpleMap.toSeq
-    val verb = bucket.httpVerb.toUpperCase
-    val qs = toSimpleMap(request.queryString).toSeq
-
-    val ws = WS.url(url)
-      .withMethod(verb)
-      .withBody(jsonBody)
-      .withHeaders(headers: _*)
-      .withQueryString(qs: _*)
+    val body = request.body.asJson.get
+    val res = rest.doPost(request.uri, body)
 
-    ws.stream().map {
-      case (proxyResponse, proxyBody) =>
-        Result(ResponseHeader(proxyResponse.status, proxyResponse.headers.mapValues(_.toList.head)), proxyBody).withHeaders(impressionKey -> bucket.impressionId)
-    }
+    res.body.map { case js =>
+      val headers = res.headers :+ ("result_size" -> rest.calcSize(js).toString)
+      jsonResponse(js, headers: _*)
+    } recoverWith ApplicationController.requestFallback(body)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b63878c/s2rest_play/app/controllers/QueryController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/QueryController.scala b/s2rest_play/app/controllers/QueryController.scala
index 480c142..e836259 100644
--- a/s2rest_play/app/controllers/QueryController.scala
+++ b/s2rest_play/app/controllers/QueryController.scala
@@ -1,311 +1,48 @@
 package controllers
 
 
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
 import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.types.{LabelWithDirection, VertexId}
-import com.kakao.s2graph.core.utils.logger
-import config.Config
-import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
-import play.api.mvc.{Action, Controller, Result}
+import com.kakao.s2graph.core.rest.RestHandler
+import play.api.libs.json.{JsValue, Json}
+import play.api.mvc.{Controller, Request}
 
-import scala.concurrent._
 import scala.language.postfixOps
-import scala.util.Try
 
-object QueryController extends Controller {
+object QueryController extends Controller with JSONParser {
 
   import ApplicationController._
   import play.api.libs.concurrent.Execution.Implicits.defaultContext
 
-  private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph
-  private val requestParser = com.kakao.s2graph.rest.Global.s2parser
+  private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest
 
-  private def badQueryExceptionResults(ex: Exception) = Future.successful(BadRequest(Json.obj("message" -> ex.getMessage)).as(applicationJsonHeader))
+  def delegate(request: Request[JsValue]) =
+    rest.doPost(request.uri, request.body).body.map { js =>
+      jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+    } recoverWith ApplicationController.requestFallback(request.body)
 
-  private def errorResults = Future.successful(Ok(PostProcess.timeoutResults).as(applicationJsonHeader))
+  def getEdges() = withHeaderAsync(jsonParser)(delegate)
 
-  def getEdges() = withHeaderAsync(jsonParser) { request =>
-    getEdgesInner(request.body)
-  }
+  def getEdgesWithGrouping() = withHeaderAsync(jsonParser)(delegate)
 
-  def getEdgesExcluded = withHeaderAsync(jsonParser) { request =>
-    getEdgesExcludedInner(request.body)
-  }
+  def getEdgesExcluded() = withHeaderAsync(jsonParser)(delegate)
 
-  private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
-    val filterOutQueryResultsLs = q.filterOutQuery match {
-      case Some(filterOutQuery) => s2.getEdges(filterOutQuery)
-      case None => Future.successful(Seq.empty)
-    }
-
-    for {
-      queryResultsLs <- s2.getEdges(q)
-      filterOutResultsLs <- filterOutQueryResultsLs
-    } yield {
-      val json = post(queryResultsLs, filterOutResultsLs)
-      json
-    }
-  }
-
-  private def calcSize(js: JsValue): Int = js match {
-    case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
-    case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum
-    case _ => 0
-  }
-
-  private def getEdgesAsync(jsonQuery: JsValue)
-                           (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[Result] = {
-    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
-    val fetch = eachQuery(post) _
-//    logger.info(jsonQuery)
-
-    Try {
-      val future = jsonQuery match {
-        case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
-        case obj@JsObject(_) => fetch(requestParser.toQuery(obj))
-        case _ => throw BadQueryException("Cannot support")
-      }
-
-      future map { json => jsonResponse(json, "result_size" -> calcSize(json).toString) }
-
-    } recover {
-      case e: BadQueryException =>
-        logger.error(s"$jsonQuery, $e", e)
-        badQueryExceptionResults(e)
-      case e: Exception =>
-        logger.error(s"$jsonQuery, $e", e)
-        errorResults
-    } get
-  }
-
-  @deprecated(message = "deprecated", since = "0.2")
-  private def getEdgesExcludedAsync(jsonQuery: JsValue)
-                                   (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[Result] = {
-
-    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
-
-    Try {
-      val q = requestParser.toQuery(jsonQuery)
-      val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
-
-      val fetchFuture = s2.getEdges(q)
-      val excludeFuture = s2.getEdges(filterOutQuery)
-
-      for {
-        queryResultLs <- fetchFuture
-        exclude <- excludeFuture
-      } yield {
-        val json = post(queryResultLs, exclude)
-        jsonResponse(json, "result_size" -> calcSize(json).toString)
-      }
-    } recover {
-      case e: BadQueryException =>
-        logger.error(s"$jsonQuery, $e", e)
-        badQueryExceptionResults(e)
-      case e: Exception =>
-        logger.error(s"$jsonQuery, $e", e)
-        errorResults
-    } get
-  }
-
-  def getEdgesInner(jsonQuery: JsValue) = {
-    getEdgesAsync(jsonQuery)(PostProcess.toSimpleVertexArrJson)
-  }
-
-  def getEdgesExcludedInner(jsValue: JsValue) = {
-    getEdgesExcludedAsync(jsValue)(PostProcess.toSimpleVertexArrJson)
-  }
-
-  def getEdgesWithGrouping() = withHeaderAsync(jsonParser) { request =>
-    getEdgesWithGroupingInner(request.body)
-  }
-
-  def getEdgesWithGroupingInner(jsonQuery: JsValue) = {
-    getEdgesAsync(jsonQuery)(PostProcess.summarizeWithListFormatted)
-  }
-
-  def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonParser) { request =>
-    getEdgesExcludedWithGroupingInner(request.body)
-  }
-
-  def getEdgesExcludedWithGroupingInner(jsonQuery: JsValue) = {
-    getEdgesExcludedAsync(jsonQuery)(PostProcess.summarizeWithListExcludeFormatted)
-  }
+  def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonParser)(delegate)
 
-  def getEdgesGroupedInner(jsonQuery: JsValue) = {
-    getEdgesAsync(jsonQuery)(PostProcess.summarizeWithList)
-  }
+  def checkEdges() = withHeaderAsync(jsonParser)(delegate)
 
-  @deprecated(message = "deprecated", since = "0.2")
-  def getEdgesGrouped() = withHeaderAsync(jsonParser) { request =>
-    getEdgesGroupedInner(request.body)
-  }
+  def getEdgesGrouped() = withHeaderAsync(jsonParser)(delegate)
 
-  @deprecated(message = "deprecated", since = "0.2")
-  def getEdgesGroupedExcluded() = withHeaderAsync(jsonParser) { request =>
-    getEdgesGroupedExcludedInner(request.body)
-  }
+  def getEdgesGroupedExcluded() = withHeaderAsync(jsonParser)(delegate)
 
-  @deprecated(message = "deprecated", since = "0.2")
-  def getEdgesGroupedExcludedInner(jsonQuery: JsValue): Future[Result] = {
-    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
+  def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonParser)(delegate)
 
-    Try {
-      val q = requestParser.toQuery(jsonQuery)
-      val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
-
-      val fetchFuture = s2.getEdges(q)
-      val excludeFuture = s2.getEdges(filterOutQuery)
-
-      for {
-        queryResultLs <- fetchFuture
-        exclude <- excludeFuture
-      } yield {
-        val json = PostProcess.summarizeWithListExclude(queryResultLs, exclude)
-        jsonResponse(json, "result_size" -> calcSize(json).toString)
-      }
-    } recover {
-      case e: BadQueryException =>
-        logger.error(s"$jsonQuery, $e", e)
-        badQueryExceptionResults(e)
-      case e: Exception =>
-        logger.error(s"$jsonQuery, $e", e)
-        errorResults
-    } get
-  }
-
-  @deprecated(message = "deprecated", since = "0.2")
-  def getEdgesGroupedExcludedFormatted = withHeaderAsync(jsonParser) { request =>
-    getEdgesGroupedExcludedFormattedInner(request.body)
-  }
-
-  @deprecated(message = "deprecated", since = "0.2")
-  def getEdgesGroupedExcludedFormattedInner(jsonQuery: JsValue): Future[Result] = {
-    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
-
-    Try {
-      val q = requestParser.toQuery(jsonQuery)
-      val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
-
-      val fetchFuture = s2.getEdges(q)
-      val excludeFuture = s2.getEdges(filterOutQuery)
-
-      for {
-        queryResultLs <- fetchFuture
-        exclude <- excludeFuture
-      } yield {
-        val json = PostProcess.summarizeWithListExcludeFormatted(queryResultLs, exclude)
-        jsonResponse(json, "result_size" -> calcSize(json).toString)
-      }
-    } recover {
-      case e: BadQueryException =>
-        logger.error(s"$jsonQuery, $e", e)
-        badQueryExceptionResults(e)
-      case e: Exception =>
-        logger.error(s"$jsonQuery, $e", e)
-        errorResults
-    } get
-  }
-
-  def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) = Action.async { request =>
-    if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized)
-    val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
-    checkEdgesInner(params)
-  }
-
-  /**
-   * Vertex
-   */
-
-  def checkEdgesInner(jsValue: JsValue) = {
-    try {
-      val params = jsValue.as[List[JsValue]]
-      var isReverted = false
-      val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]()
-      val quads = for {
-        param <- params
-        labelName <- (param \ "label").asOpt[String]
-        direction <- GraphUtil.toDir((param \ "direction").asOpt[String].getOrElse("out"))
-        label <- Label.findByName(labelName)
-        srcId <- requestParser.jsValueToInnerVal((param \ "from").as[JsValue], label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion)
-        tgtId <- requestParser.jsValueToInnerVal((param \ "to").as[JsValue], label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion)
-      } yield {
-          val labelWithDir = LabelWithDirection(label.id.get, direction)
-          labelWithDirs += labelWithDir
-          val (src, tgt, dir) = if (direction == 1) {
-            isReverted = true
-            (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)),
-              Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), 0)
-          } else {
-            (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)),
-              Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), 0)
-          }
-
-          //          logger.debug(s"SrcVertex: $src")
-          //          logger.debug(s"TgtVertex: $tgt")
-          //          logger.debug(s"direction: $dir")
-          (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir)))
-        }
-
-      s2.checkEdges(quads).map { case queryRequestWithResultLs =>
-        val edgeJsons = for {
-          queryRequestWithResult <- queryRequestWithResultLs
-          (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
-          edgeWithScore <- queryResult.edgeWithScoreLs
-          (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-          convertedEdge = if (isReverted) edge.duplicateEdge else edge
-          edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam)
-        } yield Json.toJson(edgeJson)
-
-        val json = Json.toJson(edgeJsons)
-        jsonResponse(json, "result_size" -> edgeJsons.size.toString)
-      }
-    } catch {
-      case e: Exception =>
-        logger.error(s"$jsValue, $e", e)
-        errorResults
+  def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) =
+    withHeaderAsync(jsonParser) { request =>
+      val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId))
+      rest.checkEdges(params).body.map { js =>
+        jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+      } recoverWith ApplicationController.requestFallback(request.body)
     }
-  }
-
-  def checkEdges() = withHeaderAsync(jsonParser) { request =>
-    if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized)
-
-    checkEdgesInner(request.body)
-  }
-
-  def getVertices() = withHeaderAsync(jsonParser) { request =>
-    getVerticesInner(request.body)
-  }
-
-  def getVerticesInner(jsValue: JsValue) = {
-    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
-
-    val jsonQuery = jsValue
-    val ts = System.currentTimeMillis()
-    val props = "{}"
-
-    Try {
-      val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
-        val serviceName = (js \ "serviceName").as[String]
-        val columnName = (js \ "columnName").as[String]
-        for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
-          Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props)
-        }
-      }
 
-      s2.getVertices(vertices) map { vertices =>
-        val json = PostProcess.verticesToJson(vertices)
-        jsonResponse(json, "result_size" -> calcSize(json).toString)
-      }
-    } recover {
-      case e: play.api.libs.json.JsResultException =>
-        logger.error(s"$jsonQuery, $e", e)
-        badQueryExceptionResults(e)
-      case e: Exception =>
-        logger.error(s"$jsonQuery, $e", e)
-        errorResults
-    } get
-  }
+  def getVertices() = withHeaderAsync(jsonParser)(delegate)
 }